Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redirect error not surfaced to client #358

Open
jeolted opened this issue Feb 11, 2025 · 5 comments
Open

Redirect error not surfaced to client #358

jeolted opened this issue Feb 11, 2025 · 5 comments

Comments

@jeolted
Copy link

jeolted commented Feb 11, 2025

I have an issue where an amqp client connecting to Azure IoT Hub is expected to receive a redirect to EventHub. The IoT Hub sends an Attach frame with a redirect error but the go-amqp lib doesn't recognize it and drops it.

I'm not sure who's in the wrong here. The attach frame we get back expects to have the role sender but it has receiver. The session mux doesn't find the link in linksByKey map since the role is not what it expected.

I've written a simple test for reproduction of the error:

package amqp

import (
	"context"
	"errors"
	"fmt"
	"testing"
	"time"

	"github.com/amenzhinsky/iothub/common"
)

func TestRedirect(t *testing.T) {
	t.Setenv("DEBUG_LEVEL", "7")
	ctx := context.Background()
	sak := common.SharedAccessKey{
		HostName:            "iothub-sys-test.azure-devices.net",
		SharedAccessKeyName: "iothubowner",
		SharedAccessKey:     "<redacted>",
	}
	conn, err := Dial(ctx, "amqps://"+sak.HostName, &ConnOptions{
		Properties: map[string]any{
			"com.microsoft:client-version": "dev",
		},
	})
	if err != nil {
		t.Error(err)
	}
	defer conn.Close()

	sess, err := conn.NewSession(ctx, nil)
	if err != nil {
		t.Error(err)
	}

	err = putToken(ctx, sess, sak)
	if err != nil {
		t.Error(err)
	}

	_, err = sess.NewReceiver(ctx, "messages/events/", nil)
	if err == nil {
		t.Error("expected redirect error")
	}

	var linkErr *LinkError
	if errors.As(err, &linkErr) {
		if linkErr.RemoteErr == nil {
			t.Fatal("expected Remote Error")
		} else {
			if linkErr.RemoteErr.Condition != ErrCondLinkRedirect {
				t.Error("expected Redirect condition")
			}
		}
	} else {
		t.Error("expected LinkError")
	}
}

func putToken(ctx context.Context, sess *Session, sak common.SharedAccessKey) error {
	send, err := sess.NewSender(ctx, "$cbs", nil)
	if err != nil {
		return err
	}
	defer send.Close(context.Background())

	recv, err := sess.NewReceiver(ctx, "$cbs", nil)
	if err != nil {
		return err
	}
	defer recv.Close(context.Background())

	sas, err := sak.Token(sak.HostName, 1*time.Hour)
	if err != nil {
		return err
	}

	to := "$cbs"
	replyTo := "cbs"
	if err = send.Send(ctx, &Message{
		Value: sas.String(),
		Properties: &MessageProperties{
			To:      &to,
			ReplyTo: &replyTo,
		},
		ApplicationProperties: map[string]interface{}{
			"operation": "put-token",
			"type":      "servicebus.windows.net:sastoken",
			"name":      sak.HostName,
		},
	}, &SendOptions{}); err != nil {
		return err
	}

	msg, err := recv.Receive(ctx, &ReceiveOptions{})
	if err != nil {
		return err
	}
	if err = recv.AcceptMessage(ctx, msg); err != nil {
		return err
	}

	rc, ok := msg.ApplicationProperties["status-code"].(int32)
	if !ok {
		return errors.New("unable to typecast status-code")
	}
	if rc == 200 {
		return nil
	}
	rd, _ := msg.ApplicationProperties["status-description"].(string)
	return fmt.Errorf("code = %d, description = %q", rc, rd)
}

The log from amqp lib when running the test:

% go test -tags debug -run ^TestRedirect$ . 
20:42:39.725410 TX (openAMQP 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : wsi66BDQRpMStJjWmbaE8bp_OTA-bRx9RUfju6_EC6SVknWuYUuW6Q, Hostname: iothub-sys-test.azure-devices.net, MaxFrameSize: 65536, ChannelMax: 65535, IdleTimeout: 30s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:client-version:dev]}}
20:42:39.760757 RX (openAMQP 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Open{ContainerID : 86e66eaf07d64d47bed6c33787d59246, Hostname: , MaxFrameSize: 65535, ChannelMax: 1024, IdleTimeout: 4m0s, OutgoingLocales: [], IncomingLocales: [], OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.761098 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967294, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.795390 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.795780 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: iuoFMk8F9VxkVDf_tQMhCBnYsyP_abJl5km07pb6B_reqXPTHb2zNQ, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: $cbs, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.830597 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: iuoFMk8F9VxkVDf_tQMhCBnYsyP_abJl5km07pb6B_reqXPTHb2zNQ, Handle: 0, Role: Receiver, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: $cbs, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 1048576, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.830722 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 5000, Handle: 0, DeliveryCount: 0, LinkCredit: 50, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
20:42:39.830783 RX (Session 0xc0000a76c0): link iuoFMk8F9VxkVDf_tQMhCBnYsyP_abJl5km07pb6B_reqXPTHb2zNQ attached, input handle 0, output handle 0
20:42:39.830883 TX (Sender 0xc00021c000) (pause): target: "$cbs", link credit: 0, deliveryCount: 0
20:42:39.830909 TX (Sender 0xc00021c000) (enable): target: "$cbs", link credit: 50, deliveryCount: 0
20:42:39.830882 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: U9_hcQPs2zVl7DpIL7XsLuMXNBDS0YrplMEW1SQ7u1wLdTi5m9i_kw, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: $cbs, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.864713 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: U9_hcQPs2zVl7DpIL7XsLuMXNBDS0YrplMEW1SQ7u1wLdTi5m9i_kw, Handle: 1, Role: Sender, SenderSettleMode: mixed, ReceiverSettleMode: first, Source: source{Address: $cbs, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 1048576, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:39.864826 RX (Session 0xc0000a76c0): link U9_hcQPs2zVl7DpIL7XsLuMXNBDS0YrplMEW1SQ7u1wLdTi5m9i_kw attached, input handle 1, output handle 1
20:42:39.864999 TX (Sender 0xc00021c000) (enable): target: "$cbs", link credit: 49, deliveryCount: 1
20:42:39.864992 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
20:42:39.865159 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: 0000000000000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 288}}
20:42:39.899983 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: StateAccepted{}, Batchable: false}}
20:42:39.900065 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Transfer{Handle: 1, DeliveryID: 0, DeliveryTag: 00000000, MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 67}}
20:42:39.900135 TX (Sender 0xc00021c000) (enable): target: "$cbs", link credit: 49, deliveryCount: 1
20:42:39.900250 RX (Receiver 0xc000232000) (pause): source: "$cbs", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 1, unsettled: 1, settlementCount: 0, settleMode: first
20:42:39.900278 RX (Receiver 0xc000232000) (pause): source: "$cbs", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 0, unsettled: 1, settlementCount: 0, settleMode: first
20:42:39.900288 RX (Receiver 0xc000232000) (pause): source: "$cbs", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 0, unsettled: 1, settlementCount: 0, settleMode: first
20:42:39.900301 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: StateAccepted{}, Batchable: false}}
20:42:39.900449 RX (Receiver 0xc000232000) (auto): source: "$cbs", inflight: 0, linkCredit: 0, deliveryCount: 1, messages: 0, unsettled: 0, settlementCount: 1, settleMode: first
20:42:39.900463 RX (Receiver 0xc000232000) (flow): source: "$cbs", inflight: 0, curLinkCredit: 0, newLinkCredit: 1, drain: false, deliveryCount: 1, messages: 0, unsettled: 0, settlementCount: 1, settleMode: first
20:42:39.900486 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 1, DeliveryCount: 1, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}}
20:42:39.900590 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: true, Error: *Error(nil)}}
20:42:39.935797 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 1, Closed: true, Error: *Error(nil)}}
20:42:39.935856 TX (Sender 0xc00021c000) (enable): target: "$cbs", link credit: 49, deliveryCount: 1
20:42:39.935864 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 0, Closed: true, Error: *Error(nil)}}
20:42:39.970076 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 0, Closed: true, Error: *Error(nil)}}
20:42:39.970341 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: Attach{Name: SHUruhd5aFjbKnoL2pwBj7W4rCTBk50Zl557zKIvlzvbk8lHBQZeNQ, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: messages/events/, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: <nil> Outcomes: [], Capabilities: []}, Target: source{Address: , Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:40.003988 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Attach{Name: SHUruhd5aFjbKnoL2pwBj7W4rCTBk50Zl557zKIvlzvbk8lHBQZeNQ, Handle: 0, Role: Receiver, SenderSettleMode: unsettled, ReceiverSettleMode: first, Source: <nil>, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}}
20:42:40.004088 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:redirect, Description: , Info: map[address:amqps://iothub-ns-iothub-sys-63331565-b218e81bad.servicebus.windows.net:5671/iothub-sys-test/ hostname:iothub-ns-iothub-sys-63331565-b218e81bad.servicebus.windows.net network-host:iothub-ns-iothub-sys-63331565-b218e81bad.servicebus.windows.net port:5671]}}}
20:42:40.004235 TX (connWriter 0xc0000aad00) timeout 30s: Frame{Type: AMQP, Channel: 0, Body: End{Error: *Error{Condition: amqp:not-allowed, Description: received mismatched attach frame, Info: map[]}}}
20:42:40.038583 RX (connReader 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: End{Error: *Error(nil)}}
20:42:40.038766 TX (connWriter 0xc0000aad00): Frame{Type: AMQP, Channel: 0, Body: Close{Error: *Error(nil)}}
20:42:40.038936 RX (connReader 0xc0000aad00): terminal error: read tcp 192.168.50.202:57045->13.74.108.192:5671: use of closed network connection
--- FAIL: TestRedirect (0.50s)
    badredirect_test.go:57: expected LinkError
FAIL
FAIL	github.com/Azure/go-amqp	0.905s
FAIL
@jhendrixMSFT
Copy link
Member

jhendrixMSFT commented Feb 12, 2025

Thanks for providing such detailed info, it's super helpful.

I think that IoT is sending a malformed Attach frame. As you noted, the Role is incorrect which causes our look-up to fail.

Per https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-idp315568

"Note that if the application chooses not to create a terminus, the session endpoint will still create a link endpoint and issue an attach indicating that the link endpoint has no associated local terminus. In this case, the session endpoint MUST immediately detach the newly created link endpoint."

We can see that the peer didn't create a terminus (Source is nil, and Target for that matter) and that we should expect a Detach frame (you can see that here). However, IoT should be sending a correctly formed Attach frame even if its intent is to immediately detach.

I'm not sure who owns this part of IoT but will ask around. I'll leave this open for the time being in case more info is required.

@jhendrixMSFT
Copy link
Member

@jeolted as this is an issue with the service, can you please open an issue with Azure support.

@jeolted
Copy link
Author

jeolted commented Feb 19, 2025

Unfortunately, my support plan doesn't allow me to open technical tickets on services.

@jhendrixMSFT
Copy link
Member

Understood. I've routed this internally to the IoT Hub folks to investigate further.

@jeolted
Copy link
Author

jeolted commented Feb 19, 2025

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants