Spring Boot RabbitMQ Message Pattern Example

Table of Contents

Introduction

In this tutorial I am going to show you how to build Spring Boot RabbitMQ message pattern example with TopicExchangeFanoutExchange is only capable to broadcast messages to all receivers and it has limitations to send messages selectively. The DirectExchange can allow you to send messages selectively based on routing key, however it has also limitations and it cannot do routing based on multiple criteria.

In messaging system, there may be requirements to subscribe to queues not only based on routing keys, but also based on the source which produced the message. For example, in Unix systems syslog tool routes messages based on severity (info/warn/error/critic) of the messages.

Related Posts:

Topic exchange

Messages sent to a topic exchange can’t have an arbitrary routing key – it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. A few valid routing key examples: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”. There can be as many words in the routing key as you like, up to the limit of 255 bytes.

The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one – a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:

  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.

Let’s look at the following image:

spring boot rabbitmq message pattern

In the above image, there are three bindings: Q1 is bound with binding key *.info.* and Q2 with *.*.warn and error.#.

These bindings can be interpreted as:

  • Q1 is interested in all info messages
  • Q2 is interest about warn and error messages

A message with a routing key set to “msg.info.warn” will be delivered to both queues. Message “error.info.msg” also will go to both of them. On the other hand “msg.info.hello” will only go to the first queue, and “error.msg.hello” only to the second. “error.msg.warn” will be delivered to the second queue only once, even though it matches two bindings. “quick.brown.fox” doesn’t match any binding so it will be discarded.

Messages which do not match any binding will be lost or discarded.

Prerequisites

Java 1.8+, Maven 3.8.2, Spring Boot 2.6.2, Spring AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13

Project Setup

You can use the following pom.xml file for your maven based project:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-rabbitmq-pattern</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>16</maven.compiler.source>
		<maven.compiler.target>16</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

application.properties

The following configuration (src/main/resources/application.properties) file is defined to configure the message pattern and TopicExchange name, even you can hardcode the following values in your configuration class.

topic.xchange.name=roytuts.topic
pattern.key.info=info
pattern.key.warn=warn
pattern.key.error=error

Sender

The following sender class sends messages based on message patterns. I have three different patterns for messages and I am randomly selecting the pattern from a list of strings using Random class. This piece of code shows how to select random strings from an array or a list of strings in Java. I am sending both message pattern and integer value to the queue through TopicExchange.

package com.roytuts.spring.rabbitmq.pattern.sender;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

public class Sender {

	@Autowired
	private TopicExchange exchange;

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Value("${pattern.key.info}")
	private String patternKeyInfo;

	@Value("${pattern.key.warn}")
	private String routingKeyWarn;

	@Value("${pattern.key.error}")
	private String routingKeyError;

	public void sendMsg(final Integer number) {
		final List<String> routings = Arrays.asList(patternKeyInfo, routingKeyWarn, routingKeyError);
		final String routing = routings.get(new Random().nextInt(routings.size()));

		rabbitTemplate.convertAndSend(exchange.getName(), routing, routing + number);

		System.out.println("Sent: " + number);
	}

}

Receiver

The receiver will receive the message as a string and from each of the string I am extracting the integer value discarding the string part.

package com.roytuts.spring.rabbitmq.pattern.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Receiver {

	@RabbitListener(queues = "#{deleteQueue1.name}")
	public void receive1(String msg) throws InterruptedException {
		receiveMsg(msg, 1);
	}

	@RabbitListener(queues = "#{deleteQueue2.name}")
	public void receive2(String msg) throws InterruptedException {
		receiveMsg(msg, 2);
	}

	public void receiveMsg(final String msg, int receiver) {
		StopWatch stopWatch = new StopWatch();
		stopWatch.start();

		System.out.println("Received (" + receiver + "): " + msg);

		Integer integer = extractInt(msg);

		System.out.println("Integer Number (" + receiver + "): " + integer);

		stopWatch.stop();

		System.out.println("Consumer(" + receiver + ") Done in " + stopWatch.getTotalTimeSeconds() + "s");
	}

	private Integer extractInt(String string) {
		String intValue = string.replaceAll("[^0-9]", "");
		return Integer.valueOf(intValue);
	}

}

Config

The Config class defines various Spring Beans for this application. The TopicExchange is configured for broadcasting messages based on patterns. I have defined bindings for queues with pattern keys.

I have configured AnonymousQueue, which creates a non-durable, exclusive, auto-delete queue with a generated name.

package com.roytuts.spring.rabbitmq.pattern.config;

import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.roytuts.spring.rabbitmq.pattern.receiver.Receiver;
import com.roytuts.spring.rabbitmq.pattern.sender.Sender;

@Configuration
public class Config {

	@Value("${topic.xchange.name}")
	private String topicXchangeName;

	@Value("${pattern.key.info}")
	private String patternKeyInfo;

	@Value("${pattern.key.warn}")
	private String patternKeyWarn;

	@Value("${pattern.key.error}")
	private String patternKeyError;

	@Bean
	public TopicExchange topicExchange() {
		return new TopicExchange(topicXchangeName);
	}

	@Bean
	public Queue deleteQueue1() {
		return new AnonymousQueue();
	}

	@Bean
	public Queue deleteQueue2() {
		return new AnonymousQueue();
	}

	@Bean
	public Binding binding11(TopicExchange exchange, Queue deleteQueue1) {
		return BindingBuilder.bind(deleteQueue1).to(exchange).with("*." + patternKeyInfo + ".*");
	}

	@Bean
	public Binding binding12(TopicExchange exchange, Queue deleteQueue1) {
		return BindingBuilder.bind(deleteQueue1).to(exchange).with("*.*." + patternKeyWarn);
	}

	@Bean
	public Binding binding21(TopicExchange exchange, Queue deleteQueue2) {
		return BindingBuilder.bind(deleteQueue2).to(exchange).with(patternKeyError + ".#");
	}

	@Bean
	public Receiver receiver() {
		return new Receiver();
	}

	@Bean
	public Sender sender() {
		return new Sender();
	}

}

Main Class

A class with main method having @SpringBootApplication annotation will deploy the application in the embedded Tomcat container.

package com.roytuts.spring.rabbitmq.pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.rabbitmq.pattern.sender.Sender;

@SpringBootApplication
public class RabbitMqPatternApp implements CommandLineRunner {

	@Autowired
	private Sender sender;

	public static void main(String[] args) {
		SpringApplication.run(RabbitMqPatternApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		for (int i = 1000; i < 1010; i++) {
			sender.sendMsg(i);
		}
	}

}

Testing Message Pattern

Executing the above main class will produce the following output. The only message pattern error* is matched against the binding and all other messages are discarded.

Sent: 1000
Sent: 1001
Sent: 1002
Sent: 1003
Sent: 1004
Sent: 1005
Sent: 1006
Sent: 1007
Sent: 1008
Sent: 1009
Received (2): error1003
Integer Number (2): 1003
Consumer(2) Done in 1.609E-4s
Received (2): error1006
Integer Number (2): 1006
Consumer(2) Done in 9.46E-5s
Received (2): error1007
Integer Number (2): 1007
Consumer(2) Done in 6.09E-5s

You will also find the following queues are created in RabbitMQ server.

rabbitmq message pattern

Source Code

Download

Leave a Reply

Your email address will not be published.