In this tutorial I will show you how to create durable subscribers using ActiveMQ in JMS. I am going to discuss how JMS API ensures reliable messaging by Creating Durable Subscriptions. I will show you how to build application using both Maven and Gradle build tools.
A Publish/Subscribe messaging domain is useless, if subscriber is not active while publisher is publishing a message to destination. If we create a durable subscriber instead of the non-durable subscriber, it is possible to ensure reliable messaging.
The non-durable subscriber example can be found in publish/subscribe domain examples:
- Spring JMS and ActiveMQ Integration – publish/subscribe domain
- JMS Client using JBoss 7 – Publish/Subscribe Messaging
- Configure JMS Client using GlassFish 3
For more on durable subscriber you can read JMS Concepts – Persistent and Durable.
Please go through the tutorial Apache ActiveMQ Configuration in Windows before proceeding below.
Prerequisites
Eclipse 4.12, JDK 1.8 or 12, Maven 3.6.1, Gradle 5.6, ActiveMQ 5.15.10
Apache ActiveMQ Configuration in Windows
Creating Project
If you are creating maven based project then use below pom.xml file for standalone maven project in Eclipse.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.roytuts</groupId>
<artifactId>durable-subscription-jms-activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8 or 12</java.version>
<activemq.version>5.15.11</activemq.version>
</properties>
<dependencies>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
If you are creating gradle based project in Eclipse, then use below build.gradle script to include the required dependencies.
plugins {
id 'java-library'
}
sourceCompatibility = 12
targetCompatibility = 12
repositories {
jcenter()
}
dependencies {
implementation('org.apache.activemq:activemq-broker:5.15.11')
}
JNDI Configurations
Create below jndi.properties file under classpath directory src/main/resources.
We need context factory class to perform lookup. We need broker URL and topic name as a destination where message will be published.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
topic.topic/topicName=IN_TOPIC
Message Publisher
Create below publisher class which publishes message to a topic IN_TOPIC. You will find what we are doing in the comment lines.
We have the main method included into this class so that it would be easier to test our example.
package com.roytuts.durable.subscription.jms.activemq.publisher;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class MessagePublisher {
public static void main(String[] args) {
MessagePublisher publisher = new MessagePublisher();
publisher.publishMessage("This sample message is consumed by subscriber");
}
public void publishMessage(final String msg) {
InitialContext initialContext = null;
TopicConnectionFactory connectionFactory;
TopicConnection connection = null;
TopicPublisher publisher;
TopicSession session;
Topic topic;
try {
// Step 1. Create an initial context to perform the JNDI lookup.
initialContext = new InitialContext();
// Step 2. Look-up the JMS topic
topic = (Topic) initialContext.lookup("topic/topicName");
// Step 3. Look-up the JMS Topic connection factory
connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 4. Create a JMS Topic connection
connection = connectionFactory.createTopicConnection();
// Step 5. Set the client-id on the connection
connection.start();
// step 6. Create Topic session
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
// step 7. Create publisher
publisher = session.createPublisher(topic);
// publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// Step 8. Create a text message
TextMessage message = session.createTextMessage(msg);
// Step 9. Publish the text message to the topic
publisher.publish(message);
} catch (JMSException | NamingException ex) {
ex.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (initialContext != null) {
try {
initialContext.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
}
}
Durable Subscribers
We are going to create two durable subscribers which will subscribe to the topic message subscription and on message arrival they will consume.
For durable subscriber you need to assign client id.
We have also included main method in each subscriber to test our application.
package com.roytuts.durable.subscription.jms.activemq.subscriber;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class DurableSubscriberOne {
public static void main(String[] args) {
DurableSubscriberOne durableSubscriber = new DurableSubscriberOne();
durableSubscriber.receiveMessage();
}
public void receiveMessage() {
InitialContext initialContext = null;
TopicConnectionFactory connectionFactory;
TopicConnection connection = null;
TopicSubscriber subscriber;
TopicSession session = null;
Topic topic;
try {
// Step 1. Create an initial context to perform the JNDI lookup.
initialContext = new InitialContext();
// Step 2. Look-up the JMS topic
topic = (Topic) initialContext.lookup("topic/topicName");
// Step 3. Look-up the JMS Topic connection factory
connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 4. Create a JMS Topic connection
connection = connectionFactory.createTopicConnection();
// Step 5. Set the client-id on the connection
// in case of non-durable subscriber, please remove the below line
connection.setClientID("durable-client-one");
// step 6. Start the connection
connection.start();
// step 7. Create Topic session
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
// step 8. Create the durable subscriber
// in case of non-durable subscriber, please use the below commented line
// subscriber=session.createSubscriber(topic);
subscriber = session.createDurableSubscriber(topic, "durableSubscriber");
// Step 9. Consume the message
Message message = subscriber.receive();
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println(
"DurableSubscriber received a message published by Publisher : " + textMessage.getText());
} else if (message == null) {
System.out.println(
"DurableSubscriber fails to receive the message sent by the publisher due to a timeout.");
} else {
throw new JMSException("Message must be a type of TextMessage");
}
} catch (JMSException | NamingException ex) {
ex.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (initialContext != null) {
try {
initialContext.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
}
}
Another Subscriber:
package com.roytuts.durable.subscription.jms.activemq.subscriber;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class DurableSubscriberTwo {
public static void main(String[] args) {
DurableSubscriberTwo durableSubscriber = new DurableSubscriberTwo();
durableSubscriber.receiveMessage();
}
public void receiveMessage() {
InitialContext initialContext = null;
TopicConnectionFactory connectionFactory;
TopicConnection connection = null;
TopicSubscriber subscriber;
TopicSession session = null;
Topic topic;
try {
// Step 1. Create an initial context to perform the JNDI lookup.
initialContext = new InitialContext();
// Step 2. Look-up the JMS topic
topic = (Topic) initialContext.lookup("topic/topicName");
// Step 3. Look-up the JMS Topic connection factory
connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 4. Create a JMS Topic connection
connection = connectionFactory.createTopicConnection();
// Step 5. Set the client-id on the connection
// in case of non-durable subscriber, please remove the below line
connection.setClientID("durable-client-two");
// step 6. Start the connection
connection.start();
// step 7. Create Topic session
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
// step 8. Create the durable subscriber
// in case of non-durable subscriber, please use the below commented line
// subscriber=session.createSubscriber(topic);
subscriber = session.createDurableSubscriber(topic, "durableSubscriber");
// Step 9. Consume the message
Message message = subscriber.receive();
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println(
"DurableSubscriber received a message published by Publisher : " + textMessage.getText());
} else if (message == null) {
System.out.println(
"DurableSubscriber fails to receive the message sent by the publisher due to a timeout.");
} else {
throw new JMSException("Message must be a type of TextMessage");
}
} catch (JMSException | NamingException ex) {
ex.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (initialContext != null) {
try {
initialContext.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
}
}
Testing the Application
While your ActiveMQ broker running, first run subscriber classes (DurableSubscriberOne
and DurableSubscriberTwo
) then run MessagePublisher
.
Once you run the MessagePublisher
class, you will see the below output in the console of each subscriber:
DurableSubscriber received a message published by Publisher : This sample message is consumed by subscriber
Now to test on broker restart, do the following.
First run the DurableSubscriberOne
and DurableSubscriberTwo
classes. Stop the broker. Again start the broker. Now run the MessagePublisher
class. You should see the same output as shown above in the console.
Source Code
Thanks for reading.