Spring RabbitMQ Routing Example

Table of Contents

Introduction

In this Spring RabbitMQ Routing example I am going to show you how to route messages to particular subscribers. Therefore, all receivers are not going to receive all messages, but a subset of messages.

A binding is used between an exchange and a queue for passing messages from exchange to queue. A binding can take an extra key parameter. The exchange and queue parameters are passed into the BindingBuilder and simply queue is bound “to” the exchange “with a key”:

@Bean
public Binding binding(DirectExchange directExchange, Queue autoDeleteQueue) {
    return BindingBuilder.bind(autoDeleteQueue).to(directExchange).with("key");
}

The meaning of a binding key depends on the exchange type. The fanout exchanges simply ignore its value, because it broadcasts messages to all receivers or consumers.

Related Posts:

Direct Exchange

In this example, I am going to use DirectExchange that will allow me to filter messages before broadcasting. The routing algorithm behind a direct exchange is simple – a message goes to the queues whose binding key exactly matches the routing key of the message.

For example, you may want a program which writes log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.

Let’s look at the following image:

spring rabbitmq routing

In the above image, the direct exchange (X) is bound to two queues. The first queue is bound with binding key info, and the second has two bindings, one with binding key warn and the other one with error.

In such a setup a message published to the exchange with a routing key info will be routed to queue Q1. Messages with a routing key of warn or error will go to Q2. All other messages will be discarded.

You can also bind multiple queues with the same binding key.

Prerequisites

Java 1.8+, Spring Boot 2.6.2, Spring AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13, Maven 3.8.2

Project Setup

I am creating a maven based project. The following pom.xml file can be used for your project as well.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-rabbitmq-routing</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>16</maven.compiler.source>
		<maven.compiler.target>16</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Application Properties

The src/main/resources/application.properties file contains the following key/value pairs for the application.

direct.xchange.name=roytuts.direct
routing.key.info=info
routing.key.warn=warn
routing.key.error=error

Configuration

The Config class defines various Spring Beans for this application. The DirectExchange is configured for broadcasting messages through routing key. I have defined bindings for queues with routing keys.

package com.roytuts.spring.rabbitmq.routing.config;

import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.roytuts.spring.rabbitmq.routing.receiver.Receiver;
import com.roytuts.spring.rabbitmq.routing.sender.Sender;

@Configuration
public class Config {

	@Value("${direct.xchange.name}")
	private String directXchangeName;

	@Value("${routing.key.info}")
	private String routingKeyInfo;

	@Value("${routing.key.warn}")
	private String routingKeyWarn;

	@Value("${routing.key.error}")
	private String routingKeyError;

	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(directXchangeName);
	}

	@Bean
	public Queue deleteQueue1() {
		return new AnonymousQueue();
	}

	@Bean
	public Queue deleteQueue2() {
		return new AnonymousQueue();
	}

	@Bean
	public Binding binding11(DirectExchange exchange, Queue deleteQueue1) {
		return BindingBuilder.bind(deleteQueue1).to(exchange).with(routingKeyInfo);
	}

	@Bean
	public Binding binding12(DirectExchange exchange, Queue deleteQueue1) {
		return BindingBuilder.bind(deleteQueue1).to(exchange).with(routingKeyWarn);
	}

	@Bean
	public Binding binding21(DirectExchange exchange, Queue deleteQueue2) {
		return BindingBuilder.bind(deleteQueue2).to(exchange).with(routingKeyWarn);
	}

	@Bean
	public Binding binding22(DirectExchange exchange, Queue deleteQueue2) {
		return BindingBuilder.bind(deleteQueue2).to(exchange).with(routingKeyError);
	}

	@Bean
	public Receiver receiver() {
		return new Receiver();
	}

	@Bean
	public Sender sender() {
		return new Sender();
	}

}

Sender

The following sender class sends messages to the queues based on routing keys. For this example, I am selecting routing key randomly and sending the message to the queue.

package com.roytuts.spring.rabbitmq.routing.sender;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

public class Sender {

	@Autowired
	private DirectExchange exchange;

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Value("${routing.key.info}")
	private String routingKeyInfo;

	@Value("${routing.key.warn}")
	private String routingKeyWarn;

	@Value("${routing.key.error}")
	private String routingKeyError;

	public void sendMsg(final Integer number) {
		final List<String> routings = Arrays.asList(routingKeyInfo, routingKeyWarn, routingKeyError);
		final String routing = routings.get(new Random().nextInt(routings.size()));

		rabbitTemplate.convertAndSend(exchange.getName(), routing, number);

		System.out.println("Sent: " + number);
	}

}

Receiver

The receiver class receives messages in the queues. The @RabbitListener annotation is used to listen the queue for receiving messages.

package com.roytuts.spring.rabbitmq.routing.receiver;

import java.math.BigInteger;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Receiver {

	@RabbitListener(queues = "#{deleteQueue1.name}")
	public void receive1(Integer num) throws InterruptedException {
		receiveMsg(num, 1);
	}

	@RabbitListener(queues = "#{deleteQueue2.name}")
	public void receive2(Integer num) throws InterruptedException {
		receiveMsg(num, 2);
	}

	public void receiveMsg(final Integer number, int receiver) {
		StopWatch stopWatch = new StopWatch();
		stopWatch.start();

		System.out.println("Received (" + receiver + "): " + number);

		BigInteger nextPrime = nextPrime(number);

		System.out.println("Next Prime Number (" + receiver + "): " + nextPrime);

		stopWatch.stop();

		System.out.println("Consumer(" + receiver + ") Done in " + stopWatch.getTotalTimeSeconds() + "s");
	}

	private BigInteger nextPrime(int number) {
		BigInteger veryBig = new BigInteger(String.valueOf(number));
		return veryBig.nextProbablePrime();
	}

}

Spring Boot Main Class

The following class is used to send messages through sender.

package com.roytuts.spring.rabbitmq.routing;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.rabbitmq.routing.sender.Sender;

@SpringBootApplication
public class RabbitMqRoutingApp implements CommandLineRunner {

	@Autowired
	private Sender sender;

	public static void main(String[] args) {
		SpringApplication.run(RabbitMqRoutingApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		for (int i = 1000; i < 1010; i++) {
			sender.sendMsg(i);
		}
	}

}

Testing The RabbitMQ Routing Example

Executing the application will produce the following output:

Sent: 1000
Sent: 1001
Sent: 1002
Sent: 1003
Sent: 1004
Sent: 1005
Sent: 1006
Sent: 1007
Sent: 1008
Sent: 1009
Received (2): 1000
Received (1): 1001
Next Prime Number (1): 1009
Next Prime Number (2): 1009
Consumer(1) Done in 0.0084251s
Consumer(2) Done in 0.008561s
Received (1): 1002
Received (2): 1001
Next Prime Number (2): 1009
Consumer(2) Done in 0.0063701s
Next Prime Number (1): 1009
Consumer(1) Done in 0.0071166s
Received (2): 1002
Received (1): 1003
Next Prime Number (2): 1009
Consumer(2) Done in 0.0058654s
Next Prime Number (1): 1009
Consumer(1) Done in 0.0055743s
Received (2): 1003
Received (1): 1004
Next Prime Number (2): 1009
Consumer(2) Done in 0.0039935s
Next Prime Number (1): 1009
Consumer(1) Done in 0.0044229s
Received (2): 1004
Received (1): 1005
Next Prime Number (1): 1009
Consumer(1) Done in 0.0015527s
Next Prime Number (2): 1009
Consumer(2) Done in 0.0024523s
Received (1): 1008
Received (2): 1006
Next Prime Number (1): 1009
Consumer(1) Done in 0.0016036s
Next Prime Number (2): 1009
Consumer(2) Done in 0.0017328s
Received (1): 1009
Received (2): 1007
Next Prime Number (2): 1009
Next Prime Number (1): 1013
Consumer(2) Done in 0.00143s
Consumer(1) Done in 0.0016141s
Received (2): 1008
Next Prime Number (2): 1009
Consumer(2) Done in 9.604E-4s

You will find the following queues will be created in the RabbitMQ server:

rabbitmq routing key

Source Code

Download

Leave a Reply

Your email address will not be published.