Spring JMS and ActiveMQ Integration – point-to-point domain

Spring JMS And ActiveMQ

This tutorial will show you how you can integrate Spring and Apache ActiveMQ using point-to-point messaging domain. In point to point communication there are exactly one message producer and one message consumer. The producer produces message or puts the message into the broker and the consumer will consume the message from the broker which was produced by the producer.

For more information on point-to-point messaging system please read tutorial JMS Client Configurations In GlassFish 3

Prerequisites

Read tutorial ActiveMQ Configuration In Windows for configuring ActiveMQ but you do not need to create any Queue.

Java 19, ActiveMQ 5.18.3, Spring Boot 3.1.5, Maven 3.8.5

Project Setup

I am going to create a maven based project and the name of the project is spring-jms-activemq-point-to-point.

The following pom.xml file can be used for the project.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-jms-activemq-point-to-point</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>19</maven.compiler.source>
		<maven.compiler.target>19</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.5</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Config File

I will create application.properties file under class path directory src/main/resources with the following content:

broker.url=tcp://localhost:61616
queue.name=IN_QUEUE

The property file just declares ActiveMQ broker URL and queue name as key/value pairs that will be used for messaging system.

Message Producer

Create a class called MessageProducer that will produce message or send message to the destination – Queue.

I am using here JmsTemplate API to send the message to the queue provided by Spring framework.

@Component
public class MessageProducer {

	@Autowired
	private JmsTemplate jmsTemplate;

	public void sendMessageToDefaultDestination(final String message) {
		jmsTemplate.convertAndSend(message);
	}

}

Message Consumer

Create a class called MessageDefaultConsumer that will receive message from the destination – Queue.

I am just accepting text message from the queue and for all other message it will throw an exception.

I have marked consumer with @Component annotation to let container create the instance from the bean class.

@Component
public class MessageConsumer implements MessageListener {

	@Override
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			try {
				String msg = ((TextMessage) message).getText();
				System.out.println("Message has been consumed : " + msg);
			} catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		} else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}

}

Configuration

Next I need to configure our application to create various beans such as Connection Factory, Queue, Message Container Factory etc.

@Configuration
public class JmsConfig {

	@Value("${broker.url}")
	private String brokerUrl;

	@Value("${queue.name}")
	private String queueName;

	@Autowired
	private MessageConsumer messageConsumer;

	@Bean
	public ConnectionFactory connectionFactory() {
		return new ActiveMQConnectionFactory(brokerUrl);
	}

	@Bean
	public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
		return new CachingConnectionFactory(connectionFactory);
	}

	@Bean
	public Queue queue() {
		return new ActiveMQQueue(queueName);
	}

	@Bean
	public JmsTemplate jmsTemplate(CachingConnectionFactory cachingConnectionFactory, Queue queue) {
		JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
		jmsTemplate.setDefaultDestination(queue);

		return jmsTemplate;
	}

	@Bean
	public MessageListenerContainer messageListenerContainer(CachingConnectionFactory cachingConnectionFactory,
			Queue queue) {
		DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
		defaultMessageListenerContainer.setConnectionFactory(cachingConnectionFactory);
		defaultMessageListenerContainer.setDestination(queue);
		defaultMessageListenerContainer.setMessageListener(messageConsumer);

		return defaultMessageListenerContainer;
	}

}

Main Class

A class is having a main method and @SpringBootApplication annotation will deploy the application into the embedded Tomcat server.

@SpringBootApplication
public class SpringJmsActiveMqPointToPointApp implements CommandLineRunner {

	@Autowired
	private MessageProducer messageProducer;

	public static void main(String[] args) {
		SpringApplication.run(SpringJmsActiveMqPointToPointApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		messageProducer.sendMessageToDefaultDestination("Send this message to default destination.");
	}

}

Testing the Spring ActiveMQ Point-to-Point Application

Running the main class you will get the following output in the console:

Message has been consumed : Send this message to default destination.

Verify in ActiveMQ Web Console

Click on Queues in ActiveMQ Web Console. You will not see any message in the IN_QUEUE because as soon as you ran the MessageProducer the message has been consumed by the MessageConsumer and now you will see one message has been dequeued.

I do not need to run the MessageConsumer because it is an asynchronous messaging system and MessageConsumer is already registered to the Spring’s DefaultMessageListenerContainer.

So when a message arrives to the Queue, the message gets automatically consumed by the consumer’s onMessage() method.

ActiveMQ SPring JMS integration

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *