-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumers do not resume on AutoRecovery because ConsumerWorkService is Shutdown #50
Comments
By "connection was closed" you mean "connection was lost", that is, not closed deliberately by the app, correct? |
Correct, I forced a connection closed from the management console. the connection then recovered, redeclared its models and topology. The message gets delivered to the client, but never handed to the consumer. |
Thanks, we'll investigate. |
Manual testing with a couple of F# scripts also doesn't reveal the problem. Publisher#r "RabbitMQ.Client.dll"
open System
open RabbitMQ.Client
open System.Text
open System.Collections.Generic
let enc = Encoding.UTF8
let rnd = new Random()
let cf = new ConnectionFactory()
let conn = cf.CreateConnection()
let ch = conn.CreateModel()
let q = "dotnet-issue-50"
ch.QueueDeclare(q, true, false, false, null) |> ignore
for i in 1 .. 100 do
let msg = "Message " + i.ToString()
let body = enc.GetBytes(msg)
ch.BasicPublish("", q, null, body)
printfn " [x] Sent %s" msg
conn.Close() Consumer#r "RabbitMQ.Client.dll"
open System
open RabbitMQ.Client
open System.Text
open System.Collections.Generic
open System.Threading
let enc = Encoding.UTF8
let rnd = new Random()
let cf = new ConnectionFactory(AutomaticRecoveryEnabled = true, UseBackgroundThreadsForIO = false)
let conn = cf.CreateConnection()
let ch = conn.CreateModel()
let q = "dotnet-issue-50"
ch.QueueDelete(q)
ch.QueueDeclare(q, true, false, false, null) |> ignore
type MyConsumer(ch: IModel) =
inherit DefaultBasicConsumer(ch)
override this.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange,
routingKey, props, body) =
let msg = enc.GetString(body)
printfn "=> %s" msg
ch.BasicConsume(q, true, new MyConsumer(ch))
let rec loop(): unit =
Thread.Sleep(5000)
loop()
loop() Steps Taken to Reproduce
|
Inspecting the code suggests that every |
Looks like in 3.5.0 HandleBasicDeliver is delegated to the dispatcher. Trouble seems to be when the connection was closed, Shutdown was called on the work service, clearing all the keys out. Nothing re-registers the keys with the work service when the connection is restored.
[RabbitMQ.Client.Impl.RecoveryAwareModel] {Session#1:Connection(aa6ca86e-62e9-4a25-aac2-dd2a6fc66aa7,amqp://localhost:5672)} RabbitMQ.Client.Impl.RecoveryAwareModel
The given key was not present in the dictionary.
at System.Collections.Generic.Dictionary
2.get_Item(TKey key) at RabbitMQ.Util.BatchingWorkPool
2.AddWorkItem(K key, V item) in projects\client\RabbitMQ.Client\src\util\BatchingWorkPool.cs:line 65The text was updated successfully, but these errors were encountered: