ActiveMQ publish/subscribe messaging domain

This tutorial will show you how we can send a message to Topic using publish/subscribe messaging system in Apache ActiveMQ. For more information on publish/subscribe messaging system please read tutorial https://roytuts.com/configure-jms-client-using-glassfish-3/

Before you moving forward please read the tutorial https://roytuts.com/apache-activemq-configuration-in-windows/ for configuring ActiveMQ but do not create any Topic.

Now we will look into the following steps in order to implement publish/subscribe messaging system.

1. Create a class called MessagePublisherOne that will produce message or send message to the destination – Topic.

package com.roytuts.jms.publisher;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.roytuts.jms.constants.JmsConstants;
public class PublisherOne {
	private ConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private javax.jms.MessageProducer messageProducer;
	public void publishMessage(final String message) {
		try {
			// get the ConnectionFactory
			// default broker URL is tcp://localhost:61616
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_BROKER_URL);
			// create a Connection object
			connection = connectionFactory.createConnection();
			// start the connection
			connection.start();
			// create a Session object
			// transaction false
			// auto acknowledgment sent when sending or receiving a message
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// create a destination - Queue or Topic - where message will be
			// sent or received from
			Destination destination = session
					.createTopic(JmsConstants.TOPIC_NAME);
			// create a Producer who will send the message
			messageProducer = session.createProducer(destination);
			// create a TextMessage, there are many more message types
			TextMessage textMessage = session.createTextMessage(message);
			// send the message
			messageProducer.send(textMessage);
			System.out.println("Publishing Message : " + textMessage);
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				messageProducer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

2. Create a class called MessageSubscriberOne that will receive message from the destination – Topic.

package com.roytuts.jms.subscriber;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.roytuts.jms.constants.JmsConstants;
public class SubscriberOne {
	private ConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private javax.jms.MessageConsumer messageConsumer;
	public void consumeMessage() {
		try {
			// get the ConnectionFactory
			// default broker URL is tcp://localhost:61616
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_BROKER_URL);
			// create a Connection object
			connection = connectionFactory.createConnection();
			// start the connection
			connection.start();
			// create a Session object
			// transaction false
			// auto acknowledgment sent when sending or receiving a message
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// create a destination - Queue or Topic - where message will be
			// sent or received from
			Destination destination = session
					.createTopic(JmsConstants.TOPIC_NAME);
			// create a Producer who will send the message
			messageConsumer = session.createConsumer(destination);
			// consume message sent by Producer
			Message message = messageConsumer.receive();
			// check whether it is an instance of TextMessage
			// because we have let Producer send TextMessage
			// there may be other message type
			if (message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				System.out.println("Got message published by Publisher : "
						+ textMessage);
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				messageConsumer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

3. Create a class called MessageSubscriberTwo that will receive message from the destination – Topic.

package com.roytuts.jms.subscriber;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.roytuts.jms.constants.JmsConstants;
public class SubscriberTwo {
	private ConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private javax.jms.MessageConsumer messageConsumer;
	public void consumeMessage() {
		try {
			// get the ConnectionFactory
			// default broker URL is tcp://localhost:61616
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_BROKER_URL);
			// create a Connection object
			connection = connectionFactory.createConnection();
			// start the connection
			connection.start();
			// create a Session object
			// transaction false
			// auto acknowledgment sent when sending or receiving a message
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// create a destination - Queue or Topic - where message will be
			// sent or received from
			Destination destination = session
					.createTopic(JmsConstants.TOPIC_NAME);
			// create a Producer who will send the message
			messageConsumer = session.createConsumer(destination);
			// consume message sent by Producer
			Message message = messageConsumer.receive();
			// check whether it is an instance of TextMessage
			// because we have let Producer send TextMessage
			// there may be other message type
			if (message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				System.out.println("Got message published by Publisher : "
						+ textMessage);
			} else {
				throw new JMSException("Message must be a type of TextMessage");
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			try {
				messageConsumer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

4. Create a class called JmsConstants that will hold all constants related to JMS.

package com.roytuts.jms.constants;
public class JmsConstants {
	private JmsConstants() {
	}
	public static final String TOPIC_NAME = "IN_TOPIC";
}

6. Create a main class that will run the MessagePublisherOne.

package com.roytuts.jms.publisher;
public class PublisherOneTest {
	public static void main(String[] args) {
		new PublisherOne()
				.publishMessage("This message was published by PublisherOne");
	}
}

7. Create a main class that will run the MessageSubscriberOne.

package com.roytuts.jms.subscriber;
public class SubscriberOneTest {
	public static void main(String[] args) {
		new SubscriberOne().consumeMessage();
	}
}

8. Create a main class that will run the MessageSubscriberTwo.

package com.roytuts.jms.subscriber;
public class SubscriberTwoTest {
	public static void main(String[] args) {
		new SubscriberTwo().consumeMessage();
	}
}

9. Go to the ActiveMQ Web Console and click on Topics. You will see there are two consumers in the IN_TOPIC. In order to receive message by message subscriber(s), the message subscriber(s) must run before the message publisher.

Apache ActiveMQ publish subscribe messaging domain

10. Run the SubscriberOneTest. The below output in the console you will receive as soon as you run the PublisherOneTest.

Output 

Got message published by Publisher : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:admin-PC-49502-1429415952406-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:admin-PC-49502-1429415952406-1:1:1:1, destination = topic://IN_TOPIC, transactionId = null, expiration = 0, timestamp = 1429415952624, arrival = 0, brokerInTime = 1429415952625, brokerOutTime = 1429415952627, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6824be, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = This message was published by PublisherOne}

11. Run the SubscriberTwoTest. The below output in the console you will receive as soon as you run the PublisherOneTest.

Output 

Got message published by Publisher : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:admin-PC-49502-1429415952406-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:admin-PC-49502-1429415952406-1:1:1:1, destination = topic://IN_TOPIC, transactionId = null, expiration = 0, timestamp = 1429415952624, arrival = 0, brokerInTime = 1429415952625, brokerOutTime = 1429415952633, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6824be, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = This message was published by PublisherOne}

12. Run the PublisherOneTest.
Output 

Publishing Message : ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:admin-PC-49456-1429415760292-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://IN_TOPIC, transactionId = null, expiration = 0, timestamp = 1429415760729, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = This message was published by PublisherOne}

13. Now again click on Topics in ActiveMQ Web Console. You will not see any message in the IN_TOPIC because as soon as you ran the MessagePublisherOne the message has been consumed by MessageSubscriberOne and MessageSubscriberTwo and now you will see one message was enqueued but two messages has been dequeued because there were two message subscribers.

Apache ActiveMQ publish subscribe messaging domain

Here is the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.roytuts</groupId>
	<artifactId>apache-activemq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>apache-activemq</name>
	<url>http://maven.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<jdk.version>1.8</jdk.version>
		<junit.version>4.11</junit.version>
		<slf4j.version>1.7.5</slf4j.version>
		<activemq.version>5.11.1</activemq.version>
		<spring.version>4.1.5.RELEASE</spring.version>
	</properties>
	<dependencies>
		<!-- activemq -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>${activemq.version}</version>
		</dependency>
		<!-- junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>${jdk.version}</source>
					<target>${jdk.version}</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

 
That’s all. Thank you for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *