Table of Contents
Introduction
Here I am going to build an example about publish subscribe model using Spring Boot and RabbitMQ message broker. In publish subscribe model the messages will be delivered to multiple consumers. In this example I am going to implement a fanout pattern to deliver a message to multiple consumers. So, the published messages will be broadcast to all receivers.
Related Posts:
- RabbitMQ Spring Boot Producer Consumer Example
- RabbitMQ Spring Boot Work Queues Example
- RabbitMQ Spring Boot Routing Example
- RabbitMQ Spring Boot Message Pattern Example
- RabbitMQ Spring Boot RPC Example
- RabbitMQ Spring Boot Retry and Error Handling
Exchanges
The full messaging model is done using exchanges in RabbitMQ brokers. The main idea behind messaging model in RabbitMQ is that the producer never sends any message directly to a queue or the producer does not have any idea whether a message will even be delivered to a queue.
Therefore, the producer can only send messages to an exchange. An exchange receives a message from a producer and sends a message to a receiver. The exchange has the idea what to do with a particular message – should the message be discarded, should the message be sent or should the message be appended. So, there are exchange types available, such as, direct, topic, header and fanout that actually decide what to do with a particular message.
In this example, I am going to use exchange type fanout and it just broadcasts all the received messages to all queues.
Prerequisites
Java 1.8+, Maven 3.8.2, Spring Boot 2.6.2, Spring Boot AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13
Project Setup
I am going to create maven based project for this example and the following pom.xml file can be used for your project too.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.roytuts</groupId>
<artifactId>spring-rabbitmq-publish-subscribe</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>16</maven.compiler.source>
<maven.compiler.target>16</maven.compiler.target>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Sender
The following producer class will send message to an exchange (fanout) and fanout exchange will append message to the queue. So, the queue is not directly used to send the message. The message is published to the exchange instead of the nameless one which is ignored in this case as the fanout exchange is there.
package com.roytuts.spring.rabbitmq.publish;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class Sender {
@Autowired
private FanoutExchange exchange;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(final Integer number) {
rabbitTemplate.convertAndSend(exchange.getName(), "", number);
System.out.println("Sent: " + number);
}
}
Configuration for FanoutExchange is required to prevent publishing of messages to a non-existing exchange.
Receiver
The receiver class will consume messages from the queue. The messages will be lost if no queue is bound to an exchange. So, if no consumer is listening to a queue, the message will be discarded.
package com.roytuts.spring.rabbitmq.subscribe;
import java.math.BigInteger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;
public class Receiver {
@RabbitListener(queues = "#{deleteQueue1.name}")
public void receive1(Integer num) throws InterruptedException {
receiveMsg(num, 1);
}
@RabbitListener(queues = "#{deleteQueue2.name}")
public void receive2(Integer num) throws InterruptedException {
receiveMsg(num, 2);
}
public void receiveMsg(final Integer number, int receiver) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
System.out.println("Received (" + receiver + "): " + number);
BigInteger nextPrime = nextPrime(number);
System.out.println("Next Prime Number: " + nextPrime);
stopWatch.stop();
System.out.println("Consumer(" + receiver + ") Done in " + stopWatch.getTotalTimeSeconds() + "s");
}
private BigInteger nextPrime(int number) {
BigInteger veryBig = new BigInteger(String.valueOf(number));
return veryBig.nextProbablePrime();
}
}
Exchange Name
The exchange name is declared in the src/main/resources/application.properties file:
fanout.name=roytuts.fanout
Configurations
The configuration class configures various beans for the application. Here the important thing is the queue does not have any name and two queues are declared as anonymous. The name of the queue is important when the queue is shared between producer and consumer, but it’s not the case for fanout exchange.
So, using fanout exchange all messages will be heard, not only a subset of messages. Another thing is only current messages in the flow will be accepted not the old messages.
For this purpose, you need to define fresh or empty queue. The fresh or empty queue can be created using random name or let the server choose random name for you. Another thing is the queue will be deleted automatically once the consumer gets disconnected. The AnonymousQueue has been defined to create such non-durable, exclusive, auto-delete queue with a generated random name. Once queues and fanout exchange have been defined, the next step is to bind them together using BindingBuilder.bind()
method so that messages will be delivered to queues using the fanout exchange.
package com.roytuts.spring.rabbitmq.publish.subscribe.config;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.roytuts.spring.rabbitmq.publish.Sender;
import com.roytuts.spring.rabbitmq.subscribe.Receiver;
@Configuration
public class Config {
@Value("${fanout.name}")
private String fanoutName;
@Bean
public FanoutExchange exchange() {
return new FanoutExchange(fanoutName);
}
@Bean
public Queue deleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue deleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue deleteQueue1) {
return BindingBuilder.bind(deleteQueue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue deleteQueue2) {
return BindingBuilder.bind(deleteQueue2).to(fanout);
}
@Bean
public Receiver receiver() {
return new Receiver();
}
@Bean
public Sender sender() {
return new Sender();
}
}
Spring Boot Main Class
A class with main method and @SpringBootApplication
is enough to deploy the application into embedded Tomcat server.
package com.roytuts.spring.rabbitmq.publish.subscribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.roytuts.spring.rabbitmq.publish.Sender;
@SpringBootApplication
public class RabbitMqPublishSubscribeApp implements CommandLineRunner {
@Autowired
private Sender sender;
public static void main(String[] args) {
SpringApplication.run(RabbitMqPublishSubscribeApp.class, args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 10000; i < 10010; i++) {
sender.sendMsg(i);
}
}
}
Testing RabbitMQ Publish Subscribe
The above main class will produce the following output:
Sent: 10000
Sent: 10001
Sent: 10002
Sent: 10003
Sent: 10004
Sent: 10005
Sent: 10006
Sent: 10007
Sent: 10008
Sent: 10009
Received (1): 10000
Received (2): 10000
Next Prime Number: 10007
Next Prime Number: 10007
Consumer(1) Done in 0.0091087s
Consumer(2) Done in 0.0090217s
Received (1): 10001
Received (2): 10001
Next Prime Number: 10007
Consumer(2) Done in 0.0029658s
Next Prime Number: 10007
Consumer(1) Done in 0.0039918s
Received (2): 10002
Received (1): 10002
Next Prime Number: 10007
Next Prime Number: 10007
Consumer(2) Done in 0.0034291s
Consumer(1) Done in 0.0029921s
Received (2): 10003
Received (1): 10003
Next Prime Number: 10007
Consumer(2) Done in 0.0033681s
Next Prime Number: 10007
Consumer(1) Done in 0.0034605s
Received (2): 10004
Received (1): 10004
Next Prime Number: 10007
Consumer(2) Done in 0.0028647s
Next Prime Number: 10007
Consumer(1) Done in 0.0030668s
Received (2): 10005
Received (1): 10005
Next Prime Number: 10007
Consumer(2) Done in 0.001846s
Next Prime Number: 10007
Consumer(1) Done in 0.0019808s
Received (2): 10006
Received (1): 10006
Next Prime Number: 10007
Consumer(1) Done in 0.0018491s
Next Prime Number: 10007
Consumer(2) Done in 0.0023641s
Received (1): 10007
Received (2): 10007
Next Prime Number: 10009
Consumer(1) Done in 0.0020751s
Next Prime Number: 10009
Consumer(2) Done in 0.0020344s
Received (1): 10008
Received (2): 10008
Next Prime Number: 10009
Consumer(1) Done in 0.0020453s
Next Prime Number: 10009
Consumer(2) Done in 0.0021397s
Received (2): 10009
Received (1): 10009
Next Prime Number: 10037
Next Prime Number: 10037
Consumer(2) Done in 0.0016814s
Consumer(1) Done in 0.0017667s
In RabbitMQ management console you will find two auto-generated queues which are auto-delete, exclusive: