Spring Boot RabbitMQ RPC Example

Table of Contents

Introduction

In this tutorial, I am going to show you how to create Spring Boot RabbitMQ RPC Example. RPC is an acronym which stands for Remote Procedure Call, in which a function is executed on a remote machine and result is returned to the client machine. Here I don’t have actually any RPC server, so I am going to create a dummy RPC service that will return factorial of a given number. In this example, I am going to call Sender as Client and Receiver as Server.

Related Posts:

Callback Queue

Doing RPC over RabbitMQ is simple, where a client sends a request message and a server replies with a response message. In order to receive a response you need to send a callback queue address with the request.

Spring AMQP’s RabbitTemplate handles the callback queue for you when you use the convertSendAndReceive() method. There is no need to do any other setup when using the RabbitTemplate.

Message properties

The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value).

contentType: Used to describe the mime-type of the encoding. For example, if you are using JSON encoding frequently, it is a good practice to set this property to: application/json.

replyTo: Commonly used to name a callback queue.

correlationId: Useful to correlate RPC responses with requests.

Correlation Id

Spring AMQP allows you to focus on the message style you’re working with and hide the details of message plumbing required to support this style. For example, typically the native client would create a callback queue for every RPC request. That’s pretty inefficient so an alternative is to create a single callback queue per client.

That raises a new issue, having received a response in that queue it’s not clear to which request the response belongs. That’s when the correlationId property is used. Spring AMQP automatically sets a unique value for every request. In addition it handles the details of matching the response with the correct correlationID.

One reason that Spring AMQP makes RPC style easier is that sometimes you may want to ignore unknown messages in the callback queue, rather than failing with an error. It’s due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. Spring AMQP client handles the duplicate responses gracefully, and the RPC should ideally be idempotent.

The following figure summarizes the RPC flow of the message:

rabbitmq spring rpc example
  • A direct exchange and client are setup.
  • The client will leverage the convertSendAndReceive method, passing the exchange name, routing key and the message.
  • The request is sent to a RPC queue (roytuts.rpc).
  • The server (RPC worker) is waiting for requests on that queue. When a request appears, it performs the task and sends a message with the result back to the client, using the queue from the replyTo field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application. Again, this is done automatically via the RabbitTemplate.

Prerequisites

Java 1.8+, Spring Boot 2.6.2, Spring AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13, Maven 3.8.2

Project Setup

The following pom.xml file can be used for the maven based project creation in your favorite IDE or tool.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-rabbitmq-rpc</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>16</maven.compiler.source>
		<maven.compiler.target>16</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Application Properties

The application.properties file is kept with the following content under classpath folder src/main/resources.

xchange.name=roytuts.rpc
queue.name=roytuts.rpc.queue

Config Class

The configuration class defines various beans for DirectExchange, Server, Client, Queue, etc. for the application.

@Configuration
public class Config {

	@Value("${queue.name}")
	private String queueName;

	@Value("${xchange.name}")
	private String directXchangeName;

	@Bean
	public Queue queue() {
		return new Queue(queueName);
	}

	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(directXchangeName);
	}

	@Bean
	public Binding binding(DirectExchange exchange, Queue queue) {
		return BindingBuilder.bind(queue).to(exchange).with("roytuts");
	}

	@Bean
	public Client client() {
		return new Client();
	}

	@Bean
	public Server server() {
		return new Server();
	}

}

The server and client will use the same exchange and the binding from the queue to the exchange with roytuts routing key.

Server

The server class that receives the message from client class and returns a result.

public class Server {

	@RabbitListener(queues = "${queue.name}")
	public long factorial(int n) {
		System.out.println("Received request for " + n);

		long result = computeFactorial(n);

		System.out.println("Returned " + result);

		return result;
	}

	public long computeFactorial(int number) {
		long result = 1;

		for (int f = 2; f <= number; f++) {
			result *= f;
		}

		return result;
	}

}

The receiver method is annotated with a @RabbitListener and defining the queue it’s listening on. You need to set @SendTo("roytuts.rpc.replies") when the client doesn’t set replyTo on the server method.

Client

The following client that sends requests to the queue for processing on server side.

public class Client {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Autowired
	private DirectExchange directExchange;

	public void send(int n) {
		Long response = (Long) rabbitTemplate.convertSendAndReceive(directExchange.getName(), "roytuts", n);

		System.out.println("Got " + response + "");
	}

}

I autowired the RabbitTemplate and the DirectExchange bean. Then I invoked template.convertSendAndReceive with the parameters exchange name, routing key and message. Finally, print the result in console.

Spring Boot Main Class

The following class will run the application.

@SpringBootApplication
public class SpringRabbitMqRpcApp implements CommandLineRunner {

	@Autowired
	private Client client;

	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqRpcApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		for (int i = 1; i < 10; i++) {
			client.send(i);
		}
	}

}

Output – Testing RPC

The above class produces the following output:

10:49:04.340  INFO 15272 --- [  restartedMain] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, index=0, consumerTag=amq.ctag-gFeqdefpGqryy7TCa8Lu3g identity=3d16e3c2] started
Received request for 1
Returned 1
Got 1
Received request for 2
Returned 2
Got 2
Received request for 3
Returned 6
Got 6
Received request for 4
Returned 24
Got 24
Received request for 5
Returned 120
Got 120
Received request for 6
Returned 720
Got 720
Received request for 7
Returned 5040
Got 5040
Received request for 8
Returned 40320
Got 40320
Received request for 9
Returned 362880
Got 362880

You will find the following queue has been created in the RabbitMQ server. You can check it in the RabbitMQ management console.

spring boot rabbitmq rpc example

Hope you got an idea how to build RPC example using Spring Boot and RabbitMQ server.

Source Code

Download

Leave a Reply

Your email address will not be published.