Creating Durable Subscription in JMS using ActiveMQ

In this tutorial I will show you how to create durable subscribers using ActiveMQ in JMS. I am going to discuss how JMS API ensures reliable messaging by Creating Durable Subscriptions. I will show you how to build application using both Maven and Gradle build tools.

A Publish/Subscribe messaging domain is useless, if subscriber is not active while publisher is publishing a message to destination. If we create a durable subscriber instead of the non-durable subscriber, it is possible to ensure  reliable messaging.

The non-durable subscriber example can be found in publish/subscribe domain examples:

For more on durable subscriber you can read JMS Concepts – Persistent and Durable.

Please go through the tutorial Apache ActiveMQ Configuration in Windows before proceeding below.

Prerequisites

Eclipse 4.12, JDK 1.8 or 12, Maven 3.6.1, Gradle 5.6, ActiveMQ 5.15.10
Apache ActiveMQ Configuration in Windows

Creating Project

If you are creating maven based project then use below pom.xml file for standalone maven project in Eclipse.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	
	<groupId>com.roytuts</groupId>
	<artifactId>durable-subscription-jms-activemq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8 or 12</java.version>
		<activemq.version>5.15.11</activemq.version>
	</properties>
	
	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
	</dependencies>
	
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.1</version>
				<configuration>
					<source>${java.version}</source>
					<target>${java.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

If you are creating gradle based project in Eclipse, then use below build.gradle script to include the required dependencies.

plugins {
    id 'java-library'
}

sourceCompatibility = 12
targetCompatibility = 12

repositories {
    jcenter()
}

dependencies {
    implementation('org.apache.activemq:activemq-broker:5.15.11')
}

JNDI Configurations

Create below jndi.properties file under classpath directory src/main/resources.

We need context factory class to perform lookup. We need broker URL and topic name as a destination where message will be published.

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
topic.topic/topicName=IN_TOPIC

Message Publisher

Create below publisher class which publishes message to a topic IN_TOPIC. You will find what we are doing in the comment lines.

We have the main method included into this class so that it would be easier to test our example.

package com.roytuts.durable.subscription.jms.activemq.publisher;

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MessagePublisher {

	public static void main(String[] args) {
		MessagePublisher publisher = new MessagePublisher();
		publisher.publishMessage("This sample message is consumed by subscriber");
	}

	public void publishMessage(final String msg) {
		InitialContext initialContext = null;
		TopicConnectionFactory connectionFactory;
		TopicConnection connection = null;
		TopicPublisher publisher;
		TopicSession session;
		Topic topic;

		try {
			// Step 1. Create an initial context to perform the JNDI lookup.
			initialContext = new InitialContext();

			// Step 2. Look-up the JMS topic
			topic = (Topic) initialContext.lookup("topic/topicName");

			// Step 3. Look-up the JMS Topic connection factory
			connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");

			// Step 4. Create a JMS Topic connection
			connection = connectionFactory.createTopicConnection();

			// Step 5. Set the client-id on the connection
			connection.start();

			// step 6. Create Topic session
			session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

			// step 7. Create publisher
			publisher = session.createPublisher(topic);
			// publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

			// Step 8. Create a text message
			TextMessage message = session.createTextMessage(msg);

			// Step 9. Publish the text message to the topic
			publisher.publish(message);
		} catch (JMSException | NamingException ex) {
			ex.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (initialContext != null) {
				try {
					initialContext.close();
				} catch (NamingException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

Durable Subscribers

We are going to create two durable subscribers which will subscribe to the topic message subscription and on message arrival they will consume.

For durable subscriber you need to assign client id.

We have also included main method in each subscriber to test our application.

package com.roytuts.durable.subscription.jms.activemq.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableSubscriberOne {

	public static void main(String[] args) {
		DurableSubscriberOne durableSubscriber = new DurableSubscriberOne();
		durableSubscriber.receiveMessage();
	}

	public void receiveMessage() {
		InitialContext initialContext = null;
		TopicConnectionFactory connectionFactory;
		TopicConnection connection = null;
		TopicSubscriber subscriber;
		TopicSession session = null;
		Topic topic;

		try {
			// Step 1. Create an initial context to perform the JNDI lookup.
			initialContext = new InitialContext();

			// Step 2. Look-up the JMS topic
			topic = (Topic) initialContext.lookup("topic/topicName");

			// Step 3. Look-up the JMS Topic connection factory
			connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");

			// Step 4. Create a JMS Topic connection
			connection = connectionFactory.createTopicConnection();

			// Step 5. Set the client-id on the connection
			// in case of non-durable subscriber, please remove the below line
			connection.setClientID("durable-client-one");

			// step 6. Start the connection
			connection.start();

			// step 7. Create Topic session
			session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

			// step 8. Create the durable subscriber
			// in case of non-durable subscriber, please use the below commented line
			// subscriber=session.createSubscriber(topic);
			subscriber = session.createDurableSubscriber(topic, "durableSubscriber");

			// Step 9. Consume the message
			Message message = subscriber.receive();

			if (message != null && message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				System.out.println(
						"DurableSubscriber received a message published by Publisher : " + textMessage.getText());
			} else if (message == null) {
				System.out.println(
						"DurableSubscriber fails to receive the message sent by the publisher due to a timeout.");
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException | NamingException ex) {
			ex.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (initialContext != null) {
				try {
					initialContext.close();
				} catch (NamingException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

Another Subscriber:

package com.roytuts.durable.subscription.jms.activemq.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class DurableSubscriberTwo {

	public static void main(String[] args) {
		DurableSubscriberTwo durableSubscriber = new DurableSubscriberTwo();
		durableSubscriber.receiveMessage();
	}

	public void receiveMessage() {
		InitialContext initialContext = null;
		TopicConnectionFactory connectionFactory;
		TopicConnection connection = null;
		TopicSubscriber subscriber;
		TopicSession session = null;
		Topic topic;

		try {
			// Step 1. Create an initial context to perform the JNDI lookup.
			initialContext = new InitialContext();

			// Step 2. Look-up the JMS topic
			topic = (Topic) initialContext.lookup("topic/topicName");

			// Step 3. Look-up the JMS Topic connection factory
			connectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");

			// Step 4. Create a JMS Topic connection
			connection = connectionFactory.createTopicConnection();

			// Step 5. Set the client-id on the connection
			// in case of non-durable subscriber, please remove the below line
			connection.setClientID("durable-client-two");

			// step 6. Start the connection
			connection.start();

			// step 7. Create Topic session
			session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

			// step 8. Create the durable subscriber
			// in case of non-durable subscriber, please use the below commented line
			// subscriber=session.createSubscriber(topic);
			subscriber = session.createDurableSubscriber(topic, "durableSubscriber");

			// Step 9. Consume the message
			Message message = subscriber.receive();

			if (message != null && message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				System.out.println(
						"DurableSubscriber received a message published by Publisher : " + textMessage.getText());
			} else if (message == null) {
				System.out.println(
						"DurableSubscriber fails to receive the message sent by the publisher due to a timeout.");
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException | NamingException ex) {
			ex.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (initialContext != null) {
				try {
					initialContext.close();
				} catch (NamingException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

Testing the Application

While your ActiveMQ broker running, first run subscriber classes (DurableSubscriberOne and DurableSubscriberTwo) then run MessagePublisher.

Once you run the MessagePublisher class, you will see the below output in the console of each subscriber:

DurableSubscriber received a message published by Publisher : This sample message is consumed by subscriber

Now to test on broker restart, do the following.

First run the DurableSubscriberOne and DurableSubscriberTwo classes. Stop the broker. Again start the broker. Now run the MessagePublisher class. You should see the same output as shown above in the console.

Source Code

Download

Thanks for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *