Use Dead Letter in RabbitMQ to delay messages in case of error

RabbitMQ is fast, if the number of consumer is correctly sized the messages are consumed almost instantly which can cause problems in some cases. I do not count the number of times I received the Doctrine NoResultException exception while processing a consumer because the message was consumed immediately and was out of luck at that point the database replication was late.

Simple is bad

In this case, the easiest thing to do is to NACK and requeue the message, so it comes down to queue and wait for a consumer to consume it again. The problem is that this message will be the next consumed and may cause the same error if the replication delay of the database is still there. I said at the beginning that RabbitMQ is fast. With this strategy of the ostrich, we will fall into an infinite pseudo loop: consumption of the message> error> NACK which will last the time to catch up the delay of the replication. If the consumption of the message lasts 0.5 seconds, then a delay of one minute will cost 120 consumptions of the same message for nothing. In addition to loading the server, this will also bring unnecessary load to the database server.

Another approach would be to ACK the message so that it does not come back in the queue then either send it by mail with the error or publish in an error queue to process it manually later. I chose this solution at the beginning of my use of RabbitMQ, it is simple and easy to implement.

Let's say that I have a queue called "synchro_user" binded to a "sync" exchange with routing_key "user"

créer la queue sycnhro_user
binder la queue sycnhro_user

We can use rabbitmqadmin cli tool to declare this configuration. You can get this tool from your RabbitMQ instance if you had installed administration plugin by going to this uri https://<domain>:15672/cli/rabbitmqadmin

$ ./rabbitmqadmin --vhost=/ declare queue \
> name=synchro_user durable=true
$ ./rabbitmqadmin --vhost=/ declare binding \
> source=synchro destination=synchro_user \
> routing_key=user

I declare another queue "synchro_user_error" binded to exchange "synchro_error" with routing_key "user_error".

créer la queue sycnhro_user_error
binder la queue sycnhro_user_error

$ ./rabbitmqadmin --vhost=/ declare queue \
> name=synchro_user_error durable=true
$ ./rabbitmqadmin --vhost=/ declare binding \
> source=synchro_error destination=synchro_user_error \
> routing_key=user_error
So I have my queue "synchro_user" consumed by my worker and when I get an error, I post my message in my exchange "synchro_erro" with "user_error" routing key to have my message delivered in my "synchro_user_error" queue.

créer la queue sycnhro_user_error

But with time, I added more and more queue in RabbitMQ and I got more and more error message. I have to consume these message by hand and it took me to much time. and when we arrived the morning, and found out that database crashed in the night and we have 4000 messages in error queues.....

Have you met DLX & DLK ?

RabbitMQ offers a lot of options, combined them and we can automate a message delay. Let me explain.
I keep my principle, if I have an error while consuming the queue "synchro_user", I return the message in the queue "synchro_user_error". On the other hand, I am going to modify the definition of this queue. In reality I will delete it and re-create it because things are immutable in RabbitMQ.

I start by adding a ttl on my error queue. I consider that after 5 minutes messages in this queue must be discarded. I also add the option dead letter exchange (DLX) and dead letter routing key (DLK). These options allow me to post the message elsewhere once the ttl is reached. My DLX is "sync" and my DLK "user", be careful not to forget to redo the binding between the queue "synchro_user_error" and the exchange "synchro_error".

créer la queue sycnhro_user_error

$ ./rabbitmqadmin --vhost=/ declare queue \
> name=synchro_user_error durable=true \
> arguments='{"x-message-ttl": 300000, "x-dead-letter-exchange": "synchro", "x-dead-letter-routing-key": "user"}'
$ ./rabbitmqadmin --vhost=/ declare binding \
> source=synchro destination=synchro_user_error \
> routing_key=user_error

Queue description changed in RabbitMQ.

créer la queue sycnhro_user_error

There you go! So I now have a system that allows me to automatically resolve my replication delay issues. If I have an error while consuming a message, I send it to the error queue. RabbitMQ takes care of republishing this message in the original queue at the end of the 5 min tll I specified.
This mechanism is also almost essential if your consumer uses an external API to retry later if it does not respond.

Comments are temporary disable because of spammers.


Post's details