Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question about dead exchange letter | x-death #494

Closed
fas3r opened this issue Mar 7, 2019 · 7 comments
Closed

question about dead exchange letter | x-death #494

fas3r opened this issue Mar 7, 2019 · 7 comments

Comments

@fas3r
Copy link

fas3r commented Mar 7, 2019

Hello,

I'm testing rabbitmq and I came up with something like that in order to use the dead queue letter / dead exchange + routing key.

The image below represent the flow used by the Queues/Exchanges...

The message arrives in the GQL-expire-agents Queue on the left and is consume by the _sendMsgToRetry function that rewrite the msg and send it to the Dead exchange with the routing key.

After been hold in the queue, the message goes to the QLX-retry-expire-agents and goes to the GQL-expire-agents queue and been consume by _sendMsgToRetry ... etc, etc until it reach the maximum of retries.

In the details it would be like this :

msg_arrive --> GQL-expire-agents --> _sendMsgToRetry --> fnOnRetry --> dead exchanger --> routing --> queue holding msg ( TQL-expire...1,2 and 3) --> QLX fanout exchange --> GQL-expire-agents --> _sendMsgToRetry --> fnOnRe.... .... fnOnTerminate

where :

  • _sendMsgToRetry : is the function to reformat the msg and publish the message to the exchanger
  • fnOnRetry : the function inside _sendMsgToRetry representing the consumer to use while retrying
  • fnOnTerminate : the function inside _sendMsgToRetry representing the consumer to use after max attempt.

I bind the function consumer _sendMsgToRetry to the queue GQL-expire-agents on the picture. It's the queue are created.

##Part 1
// Bind the GQLQueue to the consumer
   for (let i=1; i<=nbConsumer;i++) {
       seq.concat(channel.consume(GQLQueue, msg => {
           try {
               channel.ack(msg);
               return _sendMsgToRetry(channel, queue, msg, fnOnRetry, fnOnTerminate, max_tries);
           } catch (err) {
             throw err;
       }
     }));
  }

Here is the function that I use to check the number of attempt, reformat the message and republish to the Dead Exchanger with the appropriate routing key. It's where the consumer fnOnRetry and fnOnTerminate.

##Part 1
 function _sendMsgToRetry(channel, queue, msg, onRetry, onTerminate, max_attempt) {
    
    // function that part the content and retrieve the number of attempt
    function getAttemptAndContent(msg) {
      //console.log(msg.content.toString(contentEncoding));
      let content = JSON.parse(msg.content.toString(contentEncoding));
      
      // "exchange" field should exist, but who knows. in the other case we would have endless loop
      // cos native msg.fields.exchange will be changed after walking through DLX
      content.exchange = content.exchange || msg.fields.exchange;
      content.try_attempt = ++content.try_attempt || 1;
      const attempt = content.try_attempt;
      
      // What I don't like much .... 
      content = Buffer.from(JSON.stringify(content), contentEncoding);
      return { attempt, content };
    }

    // get attempt and content.
    const { attempt, content } = getAttemptAndContent(msg);
    
    // check the number of attempt
    if (attempt <= max_attempt) {

      // Here we increment the number en attempt and update the routing key to match "retry-expire-agents-1", "retry-expire agents-2" and "retry-expire-agents-3".

      const typeQ = _find(config.queues, {name : queue}).typeQ;
      const prefixQ = _find(config.queues, {name : queue}).prefixQ;
      const TLXExchange = _setTLXName(typeQ, prefixQ, queue);
      const routingKey = _setRoutingKey(typeQ, prefixQ, queue, attempt);
      
      // consumer function to use when on retry. If test false then continue. 
     fnOnRetry(msg, channel);

      const options = {
        contentEncoding,
        contentType: contentTypeJson,
        persistent: true,
      };

      // trying to reproduce original message
      // including msg.properties.messageId and such
      // but excluding msg.fields.redelivered
      Object.keys(msg.properties).forEach(key => {
        options[key] = msg.properties[key];
      });
      // Publish back to the Dead Letter Exchanger ( QLX-retry-expire-agents on the picture )
      return channel.publish(TLXExchange, routingKey, content, options);
    }
    
    //if max attempt reached, use this function.
    fnOnTerminate(msg, channel);
    return Promise.resolve();
  }

image

I put in place test that send ~100K msg/minutes and it seems to do the job with 3 nodes in cluster. In really I don't expect to receive that amount ... far from that ... as a final usage it's just to have some kind of persistent scheduler/countdown.

We looked into other product or modules in JS using redis but at the end, we just need to have a persistent separate service that is able to repeat on definite period some actions (1 minute to 3days for the worst case) .

My question are the following, in term of code, what should I do to not use a custom "check" in the content of the message.
I read about x-death but I did not manage to access/set/read it ... if you have any idea because from what I read in the api documentation I should be able to set 'x-death' from _sendMsgToRetry, no ?

Please mind the Promise.resolve() and the concat.(.... .. in the code.

Thanks by advance.

Fas3r

@cressie176
Copy link
Collaborator

Hi @fas3r,

One of the limitations of the amqp protocol is that it doesn't provide a way to check the number of times a message has been redelivered / republished. As far as I can tell you only two options are

  1. Modify the message and republish (the option you picked)
  2. Ensure the message producer sets a unique message id and store a count externally (the option you rejected)

Things you might consider for option 1...

  1. You can set a "x-attempts" header instead of modifying the message content, and use a headers exchange to route to your delay queues

  2. You could parse the routing key to get the number of attempts instead of modifying the message content.

  3. Be careful with x-death, it can crash older versions of RabbitMQ

I maintain a wrapper for amqplib which also handles delayed retries in a similar way to what you describe. It's worth checking out if only to give you some more ideas.

@fas3r
Copy link
Author

fas3r commented Mar 7, 2019

Hello @cressie176 ,

First, thanks for your message.

Things you might consider for option 1...

You can set a "x-attempts" header instead of modifying the message content, and use a headers exchange to route to your delay queues

You could parse the routing key to get the number of attempts instead of modifying the message content.

That's exactly what I was thinking of, as the x-death would already have the first routing key set, I could just parse it and republish accordingly.

Regarding option 2 I did not know that we can expect an id to be return from the producer, I will check that out. Also thanks for the link to the wrapper, while readingI guess you are referring to this in the readme ?

It's important to realise that even though publication emits a "success" event, this offers no guarantee that the message has been sent UNLESS you use a confirm channel. Providing you use Rascal's defaults publications will always be confirmed.

I did not know about rascal ... I did the Fn for the mapping configuration/logic by myself ... :/ I will check that for sure.

Thanks

@cressie176
Copy link
Collaborator

cressie176 commented Mar 7, 2019

I did not know that we can expect an id to be return from the producer,

The producer has to set the id. There is a field for it, but there's no guarantee the id is set. Rascal will default the id to a UUID if one isn't already specified.

It's important to realise that even though publication emits a "success" event, this offers no guarantee that the message has been sent UNLESS you use a confirm channel. Providing you use Rascal's defaults publications will always be confirmed.

No, that's talking about confirm channels - nothing to do with message ids.

@fas3r
Copy link
Author

fas3r commented Mar 7, 2019

@cressie176 ,

Yep, you are right, I just check the field reference, I was mistaken with the correlation-id which have a completely different purpose.

On my side it's not as extended as you, I add case by case and I can declare classical queues/exchanges via the mapping but I don't expect to use that part much.

So far I do only 2 case:

  • retries x time;
  • "wait and delete";

The "wait and delete" is very similar than the "retries x times" case as it's also using the dead letter + routing key, it just hold msg in a queue with the appropriate TTL. Afterwards, it goes back to the original queue . As the msg is marked as republished it get consume and delete stuff in cache/db.

Regarding

The producer has to set the id. There is a field for it, but there's no guarantee the id is set. Rascal will default the id to a UUID if one isn't already specified.

I recall that from the doc. What do you do ? Between each consumer you check and update the "tracking list" ? The problem is what to do if the msg is delivered but you don't get "success" and vice versa.

I will definitively look at your code, like how you manage the number of channels/connections, repeat the msg if this one fails. I also see that you are using the correlation-id, I need to add it.

Thanks

@cressie176
Copy link
Collaborator

cressie176 commented Mar 8, 2019

I recall that from the doc. What do you do ? Between each consumer you check and update the "tracking list" ? The problem is what to do if the msg is delivered but you don't get "success" and vice versa.

There are two scenarios I'm concerned with.

  1. When a consumer encounters an error in a way it can handle
  2. When the message actually crashes the consumer, causing the message to be rolled back. This can cause an infinite loop.

For the first scenario I favour the delayed retry mechanism by forwarding a copy of the message + some tracking headers in a manner similar to what we've been discussing. If the number of attempts exceeds some threshold I permanently dead letter, and take manual intervention.

For the second scenario I track message ids of redelivered messages in redis, with an auto-expiry. If the attempts exceed a threshold I permanently dead letter. Because I only do this for redelivered messages it doesn't affect normal performance.

Both approaches require the consumer to tolerate duplicate messages.

@fas3r
Copy link
Author

fas3r commented Mar 9, 2019

Hello @cressie176 ,

I think for point number 1 and 2 I will wait to have more concrete test throw the app to look at it, but It will be very similar approach than you i guess.

Thanks again for all your detailed answers.

fas3r.

@cressie176
Copy link
Collaborator

Welcome. Please close the issue if there's no further discussion needed.

@fas3r fas3r closed this as completed Mar 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants