Microservices Asynchronous Communication – Push Notifications

Introduction

The example I will show you here about the asynchronous communication among microservices or how microservices communicate asynchronously among themselves.

A microservices-based application is a distributed system running on multiple processes or services, usually even across multiple servers or hosts. Each service instance is typically a process. Therefore, services must interact using an inter-process communication protocol such as HTTP (Hypertext Transfer Protocol), AMQP (Advanced Message Queuing Protocol), or a binary protocol like TCP (Transmission Control Protocol), depending on the nature of each service.

The client code or message sender just sends the message when required and usually doesn’t wait for a response. It just sends the message as when sending a message to a RabbitMQ queue, Apache Kafka or any other message broker. Microservices typically use messaging protocols for asynchronous communication between microservices.

asynchronous microservices

Another important point to keep in mind while building the microservice based application is the integration mechanism among microservices. You should try to minimize the communication between internal microservices. The fewer calls between microservices, the better. But when you have to integrate microservices, the rule is to communicate among microservices asynchronously. It does not mean that you have to use any specific protocol (for example, asynchronous messaging vs synchronous HTTP). So you need to make sure that the communication between microservices should be done by propagating data asynchronously.

HTTP-Based Communication

In this communication, the client will call the service directly through HTTP protocol and usually the caller will get the service response immediately; either the success response or error response. Usually, the HTTP-based communication is a synchronous communication where the caller will block the next step until the service invocation is completed.

Why synchronous communication is not an ideal solution? The caller or client invokes a service which is down or in poor performance or slow response that leads to a serious performance problem. And obviously caller has to wait for the destination service to be completed. What will happen if hundreds of others hit the caller service? The one solution would be to add a circuit breaker for each service invocation.

The example of synchronous inter-service communication in spring boot based application using Rest Template with circuit breaker can be done as shown below:

@HystrixCommand(fallback = "fallbackMethod")
public void updateStatus(List<UpdateStatistics> updateStatistics) {
	
	restTemplate.postForObject("https://example.com/update-statistics-status", updateStatistics, String.class);
	
}

In addition to synchronous communication, you can also make asynchronous communication in HTTP base communication. There are several frameworks in Java that can be used for this purpose. One solution can be to use the CompletableFuture to wrap the service invocation in one method using @Async as shown below:

OrderService class

@Async
public CompletableFuture<Order> findOrder(String orderId) throws InterruptedException {
	logger.info("Looking up: " + orderId);
	
	String url = String.format("https://example.com/orders/%s", orderId);
	
	Order results = restTemplate.getForObject(url, Order.class);
	
	return CompletableFuture.completedFuture(results);
}

Then you can call the above method multiple times asynchronously:

// Multiple calls, asynchronous lookups
CompletableFuture<Order> order1 = orderService.findOrder("12345");
CompletableFuture<Order> order2 = orderService.findOrder("32451");
CompletableFuture<Order> order3 = orderService.findOrder("63245");

// Wait until they are all done
CompletableFuture.allOf(order1, order2, order3).join();

// Print results, including elapsed time
logger.info("Elapsed time: " + (System.currentTimeMillis() - start));

logger.info("--> " + order1.get());
logger.info("--> " + order2.get());
logger.info("--> " + order3.get());

With asynchronous communication, there is no blocking call when handling service invocation; the caller will get the response through the callback. Unlike synchronous inter-service communication, asynchronous inter-service communication will keep the services isolated from other services in a loosely coupled way.

Messaging-Based Communication

In message-based communication the integration is made loosely coupled and asynchronous. Messaging-based communication would probably be the best choice to make your application resilient and scalable.

In this type communication, you need to use message broker to manage and process the message sent by the producer or publisher; the message can be persisted if required and the message broker will guarantee the delivery of the message.

There are two types of communication using message broker: point-to-point and publish-subscribe.

Point to Point Communication

In point to point communication, a queue is used and a message producer (sender) will send the message to a queue on the message broker and a message consumer (receiver) will consume the message from the queue for further processing. In point to point communication model there are only one producer or sender and one consumer or receiver participate in the messaging system.

Queues retain all messages sent to them until the messages are consumed or until the messages expire.

Publish Subscribe Communication

In publish subscribe messaging model, a topic in broker is used for communication. Publishers and subscribers are generally anonymous and may dynamically publish or subscribe to the content hierarchy. The system takes care of distributing the messages arriving from a topic’s multiple publishers to its multiple subscribers. Topics retain messages only as long as it takes to distribute them to current subscribers.

The following characteristics are found in the publish subscribe communication system:

  • Each message may have multiple consumers.
  • Publishers and subscribers have a timing dependency. A client that subscribes to a topic can consume only messages published after the client has created a subscription, and the subscriber must continue to be active in order for it to consume messages.

In messaging-based communication, the services that consume messages, either from queue or topic, must know the common message structure that is produced or published by producer or publisher.

microservices asynchronous comunication

There are several open-source message brokers that you can use in messaging based communication such as ActiveMQ, RabbitMQ, Apache Kafka, etc. Each of the message brokers has pros and cons depending on the application’s requirements.

I would not explain each of the above mentioned message brokers but you can always read from their official documentations.

Push Notifications

Let’s assume you are designing a notification system for a bank, for example, when a new offer is created you want to send notifications to bank customers based on their eligibility criteria. The notification which you want to send, may be, to their email addresses or mobile numbers or both.

When an offer is created, you need to store it in the database to avoid it loosing. So once your offer is stored then you want to send the notification about this new offer to the bank customers.

In push-based approach, you need to use message queue to send data to clients or customers. In this approach the system will perform two distributed operations. One operation is to store the offer details in the database and the other operation will send the content, also called payload, of the offer to the message queue.

The message queue will asynchronously push data (offer details) to the user’s email or mobile number after receiving the offer data.

Unlike pull-based approach where you need to check the database for periodic intervals for a new offer. There are major disadvantages of poll-based approach:

  • it will consume a lot of bandwidth and put a huge amount of unnecessary load on the database.
  • The notification which is sent to customers will not be in real time until the database is polled.

So, a push-based approach with message queue implementation may improve the application performance and cut down a lot of bandwidth consumption.

But remember if database operation is failed then you should not send the notification to the customers otherwise there will be a discrepancy in the system.

On the other hand, if the push gets failed then what you should do? Should you revoke the database? But even if the message queue push is failed, the data is not lost because it’s already there in the database. So, you can use it from database for another push operation.

Microservices Overview

Let’s talk about the implementation part of the application. This application has three microservices – email service, push service, offer service.

The email service is responsible for sending notification email with offer details to the intended customers.

The push service basically listens to the queue and push data to the customers. The email will be sent to the customer by calling the service asynchronously using CompletableFuture and Spring’s Async mechanism.

The offer service is responsible for creating the offer, storing the offer into the database and send to the queue.

Prerequisites

Java 1.8+, Maven 3.8.2, Spring Cloud 2020.0.4, Spring Boot 2.5.5, Apache Kafka 2.12

Microservice – Offer

The offer microservice creates an offer using the endpoint /offer/create and here is the REST endpoint using Spring Boot framework:

@RestController
public class OfferRestController {

	@Autowired
	private OfferService offerService;

	@PostMapping("/offer/create")
	public ResponseEntity<OfferDto> createOffer(@RequestBody OfferDto offerDto) {

		OfferDto saved = offerService.saveOffer(offerDto);

		if (saved != null) {
			return new ResponseEntity<OfferDto>(saved, HttpStatus.CREATED);
		}

		throw new RuntimeException("Error during saving offer");
	}

}

The corresponding service class that saves offer data in the table through JpaRepository interface and sends to the Kafka topic:

public class OfferService {

	@Value("${topic.name}")
	private String topicName;

	@Autowired
	private OfferRepository offerRepository;

	@Autowired
	private KafkaTemplate<String, OfferDto> kafkaTemplate;

	public OfferDto saveOffer(OfferDto offerDto) {
		Offer offer = offerRepository.saveAndFlush(DtoEntityConverter.offerDtoToOffer(offerDto));

		offerDto = DtoEntityConverter.offerToOfferDto(offer);

		kafkaTemplate.send(topicName, offerDto);

		return offerDto;
	}

}

Don’t worry, you can download the whole source code later from the Source Code section at the bottom of this tutorial.

For this example, I have used H2 in-memory database for storing the offer data.

The application.properties file is placed under class path folder src/main/resources folder:

spring.application.name=offer-service
server.port=6000

eureka.client.service-url.default-zone=http://localhost:8761/eureka/

#Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=roytuts
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.roytuts.offer.serializer.OfferSerializer
topic.name=offers.in

#Datasource
spring.datasource.url=jdbc:h2:mem:roytuts
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=

spring.jpa.show-sql = true

spring.h2.console.enabled = true
spring.h2.console.path=/h2/console/

spring.jpa.hibernate.ddl-auto = create

In the Kafka configuration section, notice that I have created my own serializer (OfferSerializer) class otherwise it will throw Serialization exception because, by default, only string type of message gets serialized.

public class OfferSerializer implements Serializer<OfferDto> {

	@Override
	public void close() {

	}

	@Override
	public byte[] serialize(String s, OfferDto offerDto) {
		byte[] retVal = null;
		ObjectMapper objectMapper = new ObjectMapper();

		try {
			retVal = objectMapper.writeValueAsString(offerDto).getBytes();
		} catch (Exception e) {
			e.printStackTrace();
		}

		return retVal;
	}

}

I have also create entity to dto or dto to entity converter class. It is not a good practice to use entity class as a response body and return to your clients.

public final class DtoEntityConverter {

	private DtoEntityConverter() {
	}

	public static Offer offerDtoToOffer(OfferDto offerDto) {
		Offer offer = new Offer();
		offer.setId(offerDto.getId());
		offer.setCode(offerDto.getCode());
		offer.setDescription(offerDto.getDescription());

		return offer;
	}

	public static OfferDto offerToOfferDto(Offer offer) {
		OfferDto offerDto = new OfferDto();
		offerDto.setId(offer.getId());
		offerDto.setCode(offer.getCode());
		offerDto.setDescription(offer.getDescription());

		return offerDto;
	}

}

The Kafka topic configuration is done through the following class:

@Configuration
public class Config {

	@Value("${topic.name}")
	private String topicName;

	@Bean
	public NewTopic newTopic() {
		NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);

		return newTopic;
	}

}

Microservice – Push

The push service will listen to the topic where offer gets published and send it to customers through email service.

@Service
public class PushService {

	@Autowired
	private RestTemplate restTemplate;

	@KafkaListener(topics = "${topic.name}", autoStartup = "true")
	public void sendNotification(ConsumerRecord<String, OfferDto> record) {

		OfferDto offerDto = record.value();

		System.out.println("offerDto: " + offerDto);

		try {
			// Send Email
			final EmailDto emailDto = new EmailDto();
			emailDto.setEmailFromAddress("roytuts@localhost.com");
			emailDto.setEmailReplyToAddress("no-reply@localhost.com");
			emailDto.setRecipient("roytuts@localhost.com");
			emailDto.setSubject("Save Cash On Transactions");
			emailDto.setContent(offerDto.getDescription());

			callEmailService(emailDto);
		} catch (Exception ex) {
			System.out.println("Ex: " + ex.getMessage());
		}

	}

	@Async
	public CompletableFuture<Void> callEmailService(EmailDto emailDto) {
		System.out.println("Calling Email Service...");

		final String msgServiceUrl = "http://localhost:8000/email/send";

		final Void response = restTemplate.postForObject(msgServiceUrl, emailDto, Void.class);

		return CompletableFuture.completedFuture(response);
	}
}

To execute the task or call APIs asynchronously, you need to configure async using Spring framework:

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {

	@Override
	public Executor getAsyncExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(10);
		executor.setMaxPoolSize(25);
		executor.setQueueCapacity(100);
		executor.initialize();
		return executor;
	}

	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {

		return new AsyncUncaughtExceptionHandler() {
			@Override
			public void handleUncaughtException(Throwable ex, Method method, Object... params) {
				System.out.println("Exception message - " + ex.getMessage());
				System.out.println("Method name - " + method.getName());

				for (Object param : params) {
					System.out.println("Parameter value - " + param);
				}
			}
		};
	}

}

The application.properties file is kept under class path folder src/main/resources with the following content:

spring.application.name=push-service
server.port=7000

eureka.client.service-url.default-zone=http://localhost:8761/eureka/

#Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=roytuts
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.roytuts.push.serializer.OfferDeserializer
topic.name=offers.in

The consumer group name is required for the consumer. Here also I have created deserializer (OfferDeserializer) class to create the object from the Kafka message.

public class OfferDeserializer implements Deserializer<OfferDto> {

	@Override
	public void close() {

	}

	@Override
	public OfferDto deserialize(String s, byte[] bytes) {
		ObjectMapper mapper = new ObjectMapper();
		OfferDto offerDto = null;

		try {
			offerDto = mapper.readValue(bytes, OfferDto.class);
		} catch (Exception e) {
			e.printStackTrace();
		}

		return offerDto;
	}

}

The RestTemplate configuration is done through the following class:

@Configuration
public class Config {

	@Bean
	public RestTemplate restTemplate() {
		return new RestTemplate();
	}

}

Microservice – Email

The email microservice is responsible for sending emails to the customers. For testing purpose I am using here FakeSMTP server which is a Java based interface. You can also download and use it free of cost for testing purpose in your development environment.

The application.properties file with the following configurations is put under the class path folder src/main/resources.

spring.application.name=email-service
server.port=8000

eureka.client.service-url.default-zone=http://localhost:8761/eureka/

#mailhog
spring.mail.protocol=smtp
spring.mail.host=localhost
spring.mail.port=2525
spring.mail.properties.mail.smtp.auth=false
spring.mail.properties.mail.smtp.starttls.enable=false

Here you see that I have used localhost for SMTP host and 2525 port for the SMTP port and I have disabled authentication for sending emails. I have used FakeSMTP server to test my application.

The following code is responsible for sending emails or push notifications to the intended customers.

@Service
public class EmailService {

	@Autowired
	private JavaMailSender emailSender;

	public void sendEmail(String recipient, String emailFromAddress, String emailReplyToAddress, String subject,
			String content) throws MailException {
		MimeMessagePreparator messagePreparator = mimeMessage -> {
			MimeMessageHelper messageHelper = new MimeMessageHelper(mimeMessage, true);
			messageHelper.setFrom(emailFromAddress);
			messageHelper.setReplyTo(emailReplyToAddress);
			messageHelper.setTo(recipient);
			messageHelper.setSubject(subject);
			messageHelper.setText(content);
		};

		System.out.println("Sending Email ...");

		emailSender.send(messagePreparator);
		
		System.out.println("Email Sent!");
	}

}

Service – Eureka

The eureka server app configures the eureka server where your microservices get registered and found in the eureka registry.

It has just a main class with @EnableEurekaServer annotation:

@EnableEurekaServer
@SpringBootApplication
public class ServerApp {

	public static void main(String[] args) {
		SpringApplication.run(ServerApp.class, args);
	}

}

Remember this application is just an example how to design push notification system and how microservices communicate asynchronously among themselves through Apache Kafka.

Testing the Microservices’ Asynchronous Communication

Make sure you start the zookeeper and kafka servers first followed by microservices/eureka server app.

Microservices Status

You will find all your microservices are up at http://localhost:8761:

asynchronous communication among microservices

Create An Offer

Create an offer using the endpoint /offer/create of http://locahost:6000. I have used Postman tool to create an offer. You will also receive the created offer details in the response.

push notifications system design

Email On Fake SMTP Server

You will receive email on fake smtp server as shown below. You can also check other tabs – SMTP log and Last message to see more details about the message.

microservices asynchronous communication

Console Output Of Push Service

offerDto: OfferDto [id=1, code=CASHBK, description=Get 5% cashback on 5000 spent]
Calling Email Service...

Console Output Of Email Service

emailContent: EmailContent [recipient=roytuts@localhost.com, emailFromAddress=roytuts@localhost.com, emailReplyToAddress=no-reply@localhost.com, subject=Save Cash On Transactions, content=Get 5% cashback on 5000 spent]
Sending Email ...
Email Sent!

Hope You got an idea how asynchronous communication among microservices occur and how to build a push notification system using Spring Boot and Apache Kafka.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *