Spring Boot RabbitMQ Work Queues Example

Table of Contents

Introduction

Here in this example I am going to show you how to build work queues or task queues using Spring Boot framework and RabbitMQ broker. Work queues are used to distribute time-consuming tasks among multiple workers. The idea behind work queues is that each task is delivered to exactly one worker.

The purpose of work queues is to avoid resource intensive tasks to be completed immediately. So, these tasks are expected to be done later. A task is encapsulated as a message and sent to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window.

rabbitmq work queues
P – Producer | C1, C2 – Consumers

The message which will be sent by producer will be distributed to consumers. Here in this example, I will calculate next prime number as a long running or big task as there is no real world example of long running tasks here.

Prerequisites

Java 1.8+, Spring Boot 2.6.2, Spring Boot AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13

Project Setup I am going to create maven based project for this example and the following pom.xml file can be used for your project too.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-rabbitmq-workqueues</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>16</maven.compiler.source>
		<maven.compiler.target>16</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

application.properties

The application.properties file has the queue name declaration and this file is kept in the class path folder src/main/resources.

queue.name=roytuts.workqueue

Producer

A producer will produce or send a message to a queue.

package com.roytuts.spring.rabbitmq.workqueues.sender;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class Producer {

	@Autowired
	private Queue queue;

	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void sendMsg(final Integer number) {
		rabbitTemplate.convertAndSend(queue.getName(), number);
		System.out.println("Sent: " + number);
	}

}

The above class has a method sendMsg() that takes an integer as an argument and sends it to the broker or queue. To send the message I have used RabbitTemplate and Queue.

Consumer

A consumer will consume a message from a queue upon arrival.

package com.roytuts.spring.rabbitmq.workqueues.receiver;

import java.math.BigInteger;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

@RabbitListener(queues = "${queue.name}")
public class Consumer {

	private final int srNo;

	public Consumer(int srNo) {
		this.srNo = srNo;
	}

	@RabbitHandler
	public void receiveMsg(final Integer number) {
		StopWatch stopWatch = new StopWatch();
		stopWatch.start();

		System.out.println("Received (" + srNo + "): " + number);

		BigInteger nextPrime = nextPrime(number);

		System.out.println("Next Prime Number: " + nextPrime);

		stopWatch.stop();

		System.out.println("Consumer(" + srNo + ") Done in " + stopWatch.getTotalTimeSeconds() + "s");
	}

	private BigInteger nextPrime(int number) {
		BigInteger veryBig = new BigInteger(String.valueOf(number));
		return veryBig.nextProbablePrime();
	}

}

The above class uses annotation @RabbitListener with queue name, so once a message arrives in the queue, the message will be consumed by the consumer. To receive message the @RabbitHandler annotation has been used.

I have also specified the srNo that indicates which receiver will receive the message from the broker.

Configuration

A Config class that configures beans for Queue, Producer and Consumers. I have created two consumers here to consumer the messages from the queue.

package com.roytuts.spring.rabbitmq.workqueues.config;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.roytuts.spring.rabbitmq.workqueues.receiver.Consumer;
import com.roytuts.spring.rabbitmq.workqueues.sender.Producer;

@Configuration
public class Config {

	@Value("${queue.name}")
	private String queueName;

	@Bean
	public Queue queue() {
		return new Queue(queueName);
	}

	@Bean
	public Producer producer() {
		return new Producer();
	}

	@Bean
	public Consumer consumer1() {
		return new Consumer(1);
	}

	@Bean
	public Consumer consumer2() {
		return new Consumer(2);
	}

}

Spring Boot Main Class

A class that has a main method and @SpringBootApplication annotation will deploy the application in embedded Tomcat server.

package com.roytuts.spring.rabbitmq.workqueues;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.rabbitmq.workqueues.sender.Producer;

@SpringBootApplication
public class RabbitMqWorkQueuesApp implements CommandLineRunner {

	@Autowired
	private Producer producer;

	public static void main(String[] args) {
		SpringApplication.run(RabbitMqWorkQueuesApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		for (int i = 10000; i < 10010; i++) {
			producer.sendMsg(i);
		}
	}

}

The above class uses CLI interface to run the application because I am not using anything for the web application.

Testing Work Queues – Task Queues

Once you run the above main class, you will see that the producer sent a message and consumers received messages:

Sent: 10000
Sent: 10001
Sent: 10002
Sent: 10003
Sent: 10004
Sent: 10005
Sent: 10006
Sent: 10007
Sent: 10008
Sent: 10009
Received (1): 10000
Received (2): 10001
Next Prime Number: 10007
Next Prime Number: 10007
Consumer(2) Done in 0.0098059s
Consumer(1) Done in 0.0091935s
Received (2): 10003
Received (1): 10002
Next Prime Number: 10007
Consumer(2) Done in 0.0034209s
Received (2): 10005
Next Prime Number: 10007
Consumer(1) Done in 0.0045471s
Received (1): 10004
Next Prime Number: 10007
Consumer(2) Done in 0.0038564s
Next Prime Number: 10007
Consumer(1) Done in 0.0031815s
Received (2): 10007
Received (1): 10006
Next Prime Number: 10007
Consumer(1) Done in 0.0030485s
Received (1): 10008
Next Prime Number: 10009
Consumer(2) Done in 0.0049898s
Received (2): 10009
Next Prime Number: 10009
Consumer(1) Done in 0.0029158s
Next Prime Number: 10037
Consumer(2) Done in 0.0019554s

The next prime number is found at consumer side for the received integer value from the producer.

The following queue is created in the RabbitMQ server:

rabbitmq work queues

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *