Spring RabbitMQ Retry And Error Handling Example

Table of Contents

Introduction

In this example I am going to show you how to retry the failed messages in RabbitMQ message broker. So, here I am going to retry messages for few times and if error still exists after retrying the maximum attempts then the message will be put into the dead letter queue.

Related Posts:

What is Dead Letter Queue?

A dead letter queue is an undelivered message queue, so a dead letter queue is a service implementation used to store messages that meet one or more of the following criteria:

  1. Message that is sent to a queue that does not exist.
  2. Queue length limit exceeded.
  3. Message length limit exceeded.
  4. Message is rejected by another queue exchange.
  5. Message reaches a threshold read counter number, because it is not consumed. Sometimes this is called a “back out queue”.
  6. The message expires due to per-message TTL (time to live).
  7. Message is not processed successfully.

The dead letter queue storing these messages allows developers to analyze messages to look for the problems why the messages are failing to be delivered.

Prerequisites

Java 1.8+, Maven 3.8.2, Spring Boot 2.6.2, Spring AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13

Project Setup

The following pom.xml file can be used for the maven based project creation in your favorite IDE or tool.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-rabbitmq-error-retry-dlq</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>11</maven.compiler.source>
		<maven.compiler.target>11</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Application Properties

The application.properties file is kept with the following content under classpath folder src/main/resources.

exchange.dl=dlExchange
exchange.roytuts=roytutsExchange
queue.dl=dl.queue
queue.roytuts=roytuts.queue
routing.key.dl=dl
routing.key.roytuts=roytuts

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=2s
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=6s
spring.rabbitmq.listener.simple.retry.multiplier=2

I have enabled retry mechanism using the property key spring.rabbitmq.listener.simple.retry.enabled=true. The spring.rabbitmq.listener.simple.retry.initial-interval=2s tells to retry the message after initial interval of 2 seconds.

spring.rabbitmq.listener.simple.retry.max-attempts=3 tells that the maximum of 3 retries will be happened and after that the message will be put into dead letter queue.

spring.rabbitmq.listener.simple.retry.max-interval=6s says that the maximum interval between two retries is 6 seconds.

The interval in subsequent retry gets multiplied by 2 using the key/value pair spring.rabbitmq.listener.simple.retry.multiplier=2. Therefore, the retry interval will be 2s, 4s, 6s, etc.

Custom Exception

I am creating a custom exception class to throw an exception for invalid string.

package com.roytuts.spring.rabbitmq.error.retry.dlq.exception;

public class InvalidNameException extends Exception {

	private static final long serialVersionUID = 1L;

}

Producer or Sender

The producer or sender class produces or sends a message.

package com.roytuts.spring.rabbitmq.error.retry.dlq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Producer {

	@Value("${exchange.roytuts}")
	private String roytutsExchange;

	@Value("${routing.key.roytuts}")
	private String routingKeyRoytuts;

	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void sendName(final String name) {
		rabbitTemplate.convertAndSend(roytutsExchange, routingKeyRoytuts, name);
		System.out.println("Sent: " + name);
	}

}

Consumer or Receiver

The consumer or receiver consumes or receives a message.

package com.roytuts.spring.rabbitmq.error.retry.dlq.consumer;

import java.util.regex.Pattern;

import javax.naming.InvalidNameException;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "${queue.roytuts}")
public class Consumer {

	@RabbitHandler
	public void receiveMsg(final String name) throws InvalidNameException {
		if (!Pattern.matches("[a-zA-Z]+", name)) {
			throw new InvalidNameException("Name should contain only alphabets");
		}

		System.out.println("Received: " + name);
	}

}

Config Class

The configuration class will create several beans required for the applications.

package com.roytuts.spring.rabbitmq.error.retry.dlq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Config {

	@Value("${exchange.dl}")
	private String dlExchange;

	@Value("${exchange.roytuts}")
	private String roytutsExchange;

	@Value("${queue.dl}")
	private String dlQueue;

	@Value("${queue.roytuts}")
	private String roytutsQueue;

	@Value("${routing.key.dl}")
	private String routingKeyDl;

	@Value("${routing.key.roytuts}")
	private String routingKeyRoytuts;

	@Bean
	public DirectExchange dlExchange() {
		return new DirectExchange(dlExchange);
	}

	@Bean
	public DirectExchange roytutsExchange() {
		return new DirectExchange(roytutsExchange);
	}

	@Bean
	public Queue dlq() {
		return QueueBuilder.durable(dlQueue).build();
	}

	@Bean
	public Queue queue() {
		return QueueBuilder.durable(roytutsQueue).withArgument("x-dead-letter-exchange", dlExchange)
				.withArgument("x-dead-letter-routing-key", routingKeyDl).build();
	}

	@Bean
	public Binding binding() {
		return BindingBuilder.bind(queue()).to(roytutsExchange()).with(routingKeyRoytuts);
	}

	@Bean
	public Binding dlqBinding() {
		return BindingBuilder.bind(dlq()).to(dlExchange()).with(routingKeyDl);
	}

}

Spring Boot Main Class

A class with a main method and @SpringBootApplication annotation is enough to deploy the application into embedded Tomcat server.

package com.roytuts.spring.rabbitmq.error.retry.dlq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.rabbitmq.error.retry.dlq.producer.Producer;

@SpringBootApplication
public class SpringRabbitMqDlqApp implements CommandLineRunner {

	@Autowired
	private Producer producer;

	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqDlqApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		producer.sendName("Soumitra");
		producer.sendName("Roy");
		producer.sendName("Roy Tutorials");
		producer.sendName("Soumitra 2");
		producer.sendName("Roy Tutorials2");
	}

}

Testing RabbitMQ Dead Letter Queue

Once you run the above main class you will see the following output:

20.378  INFO 22836 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
20.430  INFO 22836 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#5fe18d6d:0/SimpleConnection@18cce147 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56240]
21.131  INFO 22836 --- [  restartedMain] c.r.s.r.e.r.dlq.SpringRabbitMqDlqApp     : Started SpringRabbitMqDlqApp in 2.001 seconds (JVM running for 2.433)
Sent: Soumitra
Sent: Roy
Sent: Roy Tutorials
Sent: Soumitra 2
Sent: Roy Tutorials2
Received: Soumitra
Received: Roy
27.167  WARN 22836 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'Roy Tutorials' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roytutsExchange, receivedRoutingKey=roytuts, deliveryTag=3, consumerTag=amq.ctag-d6XWSD3q37Fmbo6o9tqthw, consumerQueue=roytuts.queue])

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

27.168  WARN 22836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

33.194  WARN 22836 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'Soumitra 2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roytutsExchange, receivedRoutingKey=roytuts, deliveryTag=4, consumerTag=amq.ctag-d6XWSD3q37Fmbo6o9tqthw, consumerQueue=roytuts.queue])

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

33.195  WARN 22836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

39.209  WARN 22836 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'Roy Tutorials2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roytutsExchange, receivedRoutingKey=roytuts, deliveryTag=5, consumerTag=amq.ctag-d6XWSD3q37Fmbo6o9tqthw, consumerQueue=roytuts.queue])

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

39.213  WARN 22836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

In the above output you will see that only Soumitra and Roy were received by the receiver because these two strings do not contain any space or other than alphabets. The rest of the strings were discarded and hence exceptions thrown and moved to the dead letter queue as shown in the following image:

spring rabbitmq dead letter queue

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *