Control the number of retries of a message with x-death header in RabbitMQ

Sometimes the processing of a message causes an error. In this case, it is common to move this message to another queue to try to process it later. All this can be automated in RabbitMQ with the DLX / DLK option and the application of a TTL. If you are not familiar with this subject, I advise you to read my article on the use of Dead Letter in RabbitMQ to tempost messages in case of error. The problem with this mechanism is that a message whose processing causes an error will be retried within a certain time until the processing is successful. We can therefore, with certain conditions, create an infinite processing loop.

x-death header

The x-death header is automatically added or filled by RabbitMQ when a message is discarded from a queue. It has several pieces of information per queue like the number of times it has been rejected.

array:1 [
  "headers" => array:4 [
    "x-death" => array:2 [
      0 => array:6 [
        "count" => 1
        "exchange" => "acme"
        "queue" => "timeline_retry10"
        "reason" => "expired"
        "routing-keys" => array:1 [
          0 => "timeline_retry10"
        ]
        "time" => AMQPTimestamp {#258
          -timestamp: "1534514728"
        }
      ]
      1 => array:6 [
        "count" => 1
        "reason" => "rejected"
        "queue" => "timeline"
        "time" => AMQPTimestamp {#259
          -timestamp: "1534514718"
        }
        "exchange" => "acme"
        "routing-keys" => array:1 [
          0 => "timeline"
        ]
      ]
    ]
    "x-first-death-exchange" => "acme"
    "x-first-death-queue" => "timeline"
    "x-first-death-reason" => "rejected"
  ]
]

Operating principle

As noted above, the x-death header is automatically added by RabbitMQ when a message is discarded from a queue. This implies that we will not be able to use the example of my article on DLX and DLK as it is, because if we publish by hand the message in a retry queue, the x-death header will be lost.


To illustrate the example, I will use a personal project. I save the user's activities and display it on a timeline. My problem is that under certain conditions, the worker fails because the message was consumed too early compared to other asynchronous processing. The scheduling of asynchronous tasks being perilous, I prefer to keep the current system and delay messages that have encountered an error to allow time for other tasks to run.

Because of this, when I consume a message from the "timeline" queue and the worker encounters an error, I NACK (not acknwoledge) the message without requeue option. RabbitMQ will discard the message from the queue. I don't want to lost the message, I had previously declared the queue with the x-dead-letter-exchange (DLX) and the x-dead-letter-routing-key (DLK) options to get the message deliver to "timeline_retry10" queue. The "timeline_retry10" queue is also declared with the DLX and DLK options for the message to return to the "timeline" queue. For this transfer to be automatic, the queue "timeline_retry10" also has the option x-message-ttl so messages are automatically discard after 10 seconds.

To stop this endless loop, in my worker when I encounter an error, I look if the x-death header is present for my current queue. If this is the case and the counter "count" is greater than a value I set, I publish the message in an error queue and I ACK (acknowledge) the message  because I don't want it to stay in the "timeline" queue.

Queues declaration

I use rabbitmqadmin cli tool to declare this configuration. You can get this tool from your RabbitMQ instance if you had installed administration plugin by going to this uri http://<domain>:15672/cli/rabbitmqadmin

"Timeline" queue declaration with DLX, DLK options and binding.

$ rabbitmqadmin --vhost=acme declare queue \
> name=timeline durable=true \
> arguments='{"x-dead-letter-exchange": "acme", "x-dead-letter-routing-key": "timeline_retry10"}'
 
$ rabbitmqadmin --vhost=acme declare binding source=acme destination=timeline routing_key=timeline
 

"Timeline_retry10" queue declaration with DLX, DLK, TTL options and binding.

$ rabbitmqadmin --vhost=acme declare queue \ 
> name=timeline_retry10 durable=true \
> arguments='{"x-dead-letter-exchange": "acme", "x-dead-letter-routing-key": "timeline", "x-message-ttl": 10000}'
 
$ rabbitmqadmin --vhost=acme declare binding source=acme destination=timeline_retry10 routing_key=timeline_retry10

Usage in worker

X-death header will have different structure in PHP depends if you are using AMQP Library or PHP extension. Library  will provide the header as an instance of "PhpAmqpLib\wire\AMQPArray" class and extension will provide header as an array.

Usage of pecl extension amqp

<?php 
//init RabbitMQ connection
 
/* @var \AMQPQueue $queue */
 
$message = $queue->get();
try {
    // message processing
} catch (\Throwable $e) {
    $headers = $message->getHeaders();
    if (isset($headers['x-death'])) {
        foreach ($headers['x-death'] as $xDeath) {
            if (isset($xDeath['queue']) && $xDeath['queue'] == 'timeline' && $xDeath['count'] > 5) {
                //too many retry, we publish message in error queue and ack.
                $queue->ack($message->getId());
            }
        }
    }
    //nack message to let him go in retry queue.
    $queue->nack($message->getId(), false);
}
 

Usage of phpamqplib library

I'm using getNativeData() method from AMQPArray class to get a simple array and have the same base code as above.

<?php 
//init RabbitMQ conenction
 
/* @var PhpamqpLib\Channel\AMQPChannel $channel */
 
$message = $channel->basic_get('timeline');
try {
    // message processing
} catch (\Throwable $e) {
    $headers = $message->get('application_headers);
    if (isset($headers['x-death'])) {
        $xDeaths = $headers['x-death'][1]->getNativeData();
        foreach ($xDeaths as $xDeath) {
            if (isset($xDeath['queue']) && $xDeath['queue'] == 'timeline' && $xDeath['count'] > 5) {
                //too many retry, we publish message in error queue and ack.
                $channel->basic_ack($message->getId());
            }
        }
    }
    //nack message to let him go in retry queue.
    $queue->basic_nack($message->getId(), false, false);
}

Usage with Swarrot

If you are using swarrot (library or symfony bundle), It provide a processor to use x-death header.


This article is finished, I hope it will have shown you a new approach to RabbitMQ. The mechanics described above is simple to use and its only limitation is that it is not possible to use several retry queue with different TTL.

Comments are temporary disable because of spammers.

Search

Post's details