This guide is intended to assist in the migration from the pre-release azure-service-bus-go
package to the latest beta releases (and eventual GA) of the github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
.
The redesign of the Service Bus SDK offers better integration with Azure Identity, a simpler API surface that allows you to uniformly work with queues, topics, subscriptions and subqueues (for instance: dead letter queues).
The redesign for the API surface of Service Bus involves changing the way that clients are created. We wanted to simplify the number of types needed to get started, while also providing clarity on how, as a user of the SDK, to manage the resources the SDK creates (connections, links, etc...)
Namespace
toClient
migration- Sending messages
- Sending messages in batches
- Processing and receiving messages
- Using dead letter queues
One big change is that the top level "client" is now Client, not Namespace
:
Previous code:
// previous code
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString())
New (using azservicebus
):
// new code
client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)
You can also use azidentity
credentials. See the Azure Identity integration section
below.
Sending is done from a Sender, which works the same for queues or topics:
sender, err := client.NewSender(queueOrTopicName, nil)
sender.SendMessage(context.TODO(), &azservicebus.Message{
Body: []byte("hello world"),
}, nil)
Sending messages in batches is similar, except that the focus has been moved more
towards giving the user full control using the MessageBatch
type.
// Create a message batch. It will automatically be sized for the Service Bus
// Namespace's maximum message size.
messageBatch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
// Add a message to our message batch. This can be called multiple times.
err = messageBatch.AddMessage(&azservicebus.Message{
Body: []byte(fmt.Sprintf("hello world")),
}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
// send what we have since the batch is full
err := sender.SendMessageBatch(context.TODO(), messageBatch, nil)
if err != nil {
panic(err)
}
// Create a new batch, add this message and start again.
} else if err != nil {
panic(err)
}
Receiving has been changed to be pull-based, rather than using callbacks.
You can receive messages using the Receiver, for receiving of messages in batches.
Receivers allow you to request messages in batches:
receiver, err := client.NewReceiverForQueue(queue, nil)
// or for a subscription
receiver, err := client.NewReceiverForSubscription(topicName, subscriptionName, nil)
// receiving multiple messages at a time.
messages, err := receiver.ReceiveMessages(context.TODO(), numMessages, nil)
Previously, you created a receiver through an entity struct, like Queue or Subscription:
// previous code
queue, err := ns.NewQueue()
deadLetterReceiver, err := queue.NewDeadLetterReceiver()
// or
topic, err := ns.NewTopic("topic")
subscription, err := topic.NewSubscription("subscription")
deadLetterReceiver, err := subscription.NewDeadLetterReceiver()
// the resulting receiver was a `ReceiveOner` which had different
// functions than some of the more full-fledged receiving types.
Now, in azservicebus
:
// new code
receiver, err := client.NewReceiverForQueue(
queueName,
&azservicebus.ReceiverOptions{
ReceiveMode: azservicebus.ReceiveModePeekLock,
SubQueue: azservicebus.SubQueueDeadLetter,
})
//or
receiver, err := client.NewReceiverForSubscription(
topicName,
subscriptionName,
&azservicebus.ReceiverOptions{
ReceiveMode: azservicebus.ReceiveModePeekLock,
SubQueue: azservicebus.SubQueueDeadLetter,
})
The Receiver
type for a dead letter queue is the same as the receiver for a
queue or subscription, making things more consistent.
Message settlement functions have moved to the Receiver
, rather than being on the Message
.
Previously:
// previous code
receiver.Listen(ctx, servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
m.Complete(ctx)
return nil
}))
Now, using azservicebus
:
// new code
// with a Receiver
messages, err := receiver.ReceiveMessages(ctx, 10, nil)
for _, message := range messages {
err = receiver.CompleteMessage(ctx, message, nil)
}
Azure Identity has been directly integrated into the Client
via the NewClient()
function. This allows you to take advantage of conveniences like DefaultAzureCredential or any of the supported types within the package.
In azservicebus
:
// import "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
// `DefaultAzureCredential` tries several common credential types. For more credential types
// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
credential, err := azidentity.NewDefaultAzureCredential(nil)
client, err := azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)
Administration features, like creating queues, topics and subscriptions, has been moved into a dedicated client (admin.Client).
// note: import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
adminClient, err := admin.NewClientFromConnectionString(connectionString, nil)
// create a queue with default properties
resp, err := adminClient.CreateQueue(context.TODO(), "queue-name", nil)
// or create a queue and configure some properties
Entities that use sessions can now be be received from:
// to get a specific session by ID
sessionReceiver, err := client.AcceptSessionForQueue(context.TODO(), "queue", "session-id", nil)
// or client.AcceptSessionForSubscription
// to get the next available session from Service Bus (service-assigned)
sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), "queue", nil)
// SessionReceiver's are similar to Receiver's with some additional functions:
// managing session state
sessionData, err := sessionReceiver.GetSessionState(context.TODO())
err = sessionReceiver.SetSessionState(context.TODO(), []byte("data"))
// renewing the lock associated with the session
err = sessionReceiver.RenewSessionLock(context.TODO())