Controler le nombre de réessaie d'un message avec x-death dans RabbitMQ

Parfois le traitement d'un message provoque une erreur. Dans ce cas, il est courant de déplacer ce message dans une autre queue pour réessayer de le traiter plus tard. Tout cela peut être automatisé dans RabbitMQ avec les option DLX/DLK et l'application d'un TTL. Si vous n'êtes pas famillié avec ce sujet, je vous conseille de lire mon article sur l'utilisation de Dead Letter dans RabbitMQ pour temposier les messages en cas d'erreur. Le problème avec ce mécanisme c'est qu'un message dont le traitement provoque une erreur sera retenter dans un certains délai et ce jusqu'à ce que le traitement réussisse. On peut donc, avec certaines conditions, créer une boucle de traitement infini.

Le header x-death

Le header x-death est automatiquement ajouté ou complété par RabbitMQ quand un message est rejeté d'une queue. Il comporte plusieurs informations par queue dont le nombre de fois qu'il a été rejeté.

array:1 [
  "headers" => array:4 [
    "x-death" => array:2 [
      0 => array:6 [
        "count" => 1
        "exchange" => "acme"
        "queue" => "timeline_retry10"
        "reason" => "expired"
        "routing-keys" => array:1 [
          0 => "timeline_retry10"
        ]
        "time" => AMQPTimestamp {#258
          -timestamp: "1534514728"
        }
      ]
      1 => array:6 [
        "count" => 1
        "reason" => "rejected"
        "queue" => "timeline"
        "time" => AMQPTimestamp {#259
          -timestamp: "1534514718"
        }
        "exchange" => "acme"
        "routing-keys" => array:1 [
          0 => "timeline"
        ]
      ]
    ]
    "x-first-death-exchange" => "acme"
    "x-first-death-queue" => "timeline"
    "x-first-death-reason" => "rejected"
  ]
]
 
 

Principe de fonctionnement

Comme indiqué au dessus, le header x-death est ajouté automatiquement par RabbitMQ quand un message est rejeté d'une queue. Cela implique que l'on ne va pas pouvoir utiliser l'exemple de mon article sur DLX et DLK tel quel, car si on publie à la main le message dans une queue de retry, le header x-death sera perdu.

Pour illustrer l'exemple, je vais utiliser un projet perso. J'enregistre les activités de l'utilisateur pour lui afficher sur une timeline. Mon problème c'est que dans certaines conditions, le worker échoue car le message a été consommé trop tôt par rapport à d'autre traitement asynchrone. L'ordonnancement de tâches asynchrones étant périlleux, je préfère garder le système actuel et temporiser les messages qui ont rencontré une erreur pour laisser le temps aux autres taches de s’exécuter.

De ce fait, quand je consomme un message de la queue "timeline" et que le worker rencontre une erreur, je NACK (not acknwoledge) le message sans le requeue. RabbitMQ va le rejeté de la queue. Pour qu'il ne soit pas perdu à jamais, j'avais au préalable déclaré la queue avec les option x-dead-letter-exchange (DLX) et x-dead-letter-routing-key (DLK) pour que le message se retrouve dans la queue "timeline_retry10". La queue "timeline_retry10" est également déclarée avec les options DLX et DLK pour que le message retourne dans le queue "timeline". Pour que ce transfert soit automatique, la queue "timeline_retry10" a également l'option x-message-ttl pour que les messages sot automatiquement rejetés au bout de 10 secondes.

Pour arrêter cette boucle sans fin, dans mon worker quand je rencontre une erreur, je regarde si le header x-death est présent pour ma queue en cours. Si c'est le cas et que le compteur "count" est supérieur à une valeur que j'ai paramétré, dans ce cas je publie le message dans une queue d'erreur et je ACK (acknowledge) le message pour qu'il ne reste pas dans la queue "timeline".

Déclaration des queues

Pour déclarer la configuration j'utilise le binaire rabbitmqadmin en ligne de commande. Il peut être télécharger depuis le plugin d'administration s'il est installé depuis l'url http://<domain>:15672/cli/rabbitmqadmin

Déclaration de la queue "timeline" avec ses options DLX et DLK et binding sur l'exchange.

$ rabbitmqadmin --vhost=acme declare queue \
> name=timeline durable=true \
> arguments='{"x-dead-letter-exchange": "acme", "x-dead-letter-routing-key": "timeline_retry10"}'
 
$ rabbitmqadmin --vhost=acme declare binding source=acme destination=timeline routing_key=timeline
 

Déclaration de la queue "timeline_retry10" avec ses options DLX, DLK et TTL ainsi que son binding.

$ rabbitmqadmin --vhost=acme declare queue \ 
> name=timeline_retry10 durable=true \
> arguments='{"x-dead-letter-exchange": "acme", "x-dead-letter-routing-key": "timeline", "x-message-ttl": 10000}'
 
$ rabbitmqadmin --vhost=acme declare binding source=acme destination=timeline_retry10 routing_key=timeline_retry10

Utilisation dans le worker

Coté PHP le header x-death aura une structure différente suivant que l'on utilise la librairie AMQP ou l'extension PHP. La librairie renvoie le header sous la forme d'une instance de la class "PhpAmqpLib\wire\AMQPArray" alors que l'extension renvoie un simple tableau PHP.

Utilisation de l'extension pecl amqp

<?php 
//init de la connexion à RabbitMQ
 
/* @var \AMQPQueue $queue */
 
$message = $queue->get();
try {
    // traitement du message
} catch (\Throwable $e) {
    $headers = $message->getHeaders();
    if (isset($headers['x-death'])) {
        foreach ($headers['x-death'] as $xDeath) {
            if (isset($xDeath['queue']) && $xDeath['queue'] == 'timeline' && $xDeath['count'] > 5) {
                //Trop d'essaie, on publie le message en queue d'erreur et on le ack.
                $queue->ack($message->getId());
            }
        }
    }
    //on nack le message pour le réessayer plus tard.
    $queue->nack($message->getId(), false);
}
 

Utilisation de la librairie phpamqplib

Je transforme la classe AMQPArray en un tableau avec la méthode getNativeData() pour simplifier l'utilisation.

<?php 
//init de la connexion à RabbitMQ
 
/* @var PhpamqpLib\Channel\AMQPChannel $channel */
 
$message = $channel->basic_get('timeline');
try {
    // traitement du message
} catch (\Throwable $e) {
    $headers = $message->get('application_headers');
    if (isset($headers['x-death'])) {
        $xDeaths = $headers['x-death'][1]->getNativeData();
        foreach ($xDeaths as $xDeath) {
            if (isset($xDeath['queue']) && $xDeath['queue'] == 'timeline' && $xDeath['count'] > 5) {
                //Trop d'essaie, on publie le message en queue d'erreur et on le ack.
                $channel->basic_ack($message->getId());
            }
        }
    }
    //on nack le message pour le réessayer plus tard.
    $queue->basic_nack($message->getId(), false, false);
}

Utilisation de Swarrot

Si vous utilisez swarrot (la librairie ou le bundle symfony), il y a un processor disponible pour traiter x-death dans les headers du message.


Cet article est terminé, j'espère qu'il vous aura montré une nouvelle approche de RabbitMQ. La mécanique décrit au dessus est simple à utiliser sa seule limitation est qu'il n'est pas possible d'utiliser plusieurs queues de retry avec des TTL différents.

Ajouter un commentaire