Event Driven Streaming using Spring Cloud Stream and Apache Kafka

In this tutorial we will see an example of event driven streaming using Spring Cloud Stream and Apache Kafka streaming platform. How do we deal with some events, such as, a new user has registered to a portal, an order was placed, a file has been uploaded etc? Let’s say when an order was placed then we need a call to process payment, a call to reserve inventory, a call to begin the process of packaging, picking and shipping the product.

For a few orders it’s not a big deal and we can make few backend services directly and it should not introduce too much overhead. But, suppose, we have 100 such orders and we have to deal with such 100 orders a second then we have to make 3×100 = 300 calls to backend services per second. Now if we add another service, for example, reporting to our sales track, then we have to make 400 calls to our backend services per second. Therefore it’s too much overhead.

What if instead, we can simply have our website alert our whole architecture at once? It can yell, “Hey! I made a sale” to our whole stack, and any component that’s interested can take the appropriate action. This means we don’t need to update our frontend as we add additional services, and our new services just need to know what to listen for.

The above is an example of an event-driven architecture, where instead of reaching out to each service one by one, our services instead emit a change of state. This is where Spring Cloud Stream comes into picture.

In this example we will use Apache Kafka as event streaming platform.

Prerequisites

Eclipse 2019-12, Apache Kafka 2.12-2.3.1, Gradle 6.1.1, Maven 3.6.3, Java at least 8, Spring Cloud Starter Stream Kafka 3.0.2, Spring Boot 2.2.4

Create Project

This example consists of two microservices, one to produce events and one to consume them.

The producer project name is spring-cloud-stream-kafka-ordersource and the consumer project name is spring-cloud-stream-kafka-ordercheck.

If you are creating gradle based project in Eclipse then you can use below build.gradle script:

buildscript {
	ext {
		springBootVersion = '2.2.4.RELEASE'
	}
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

plugins {
    id 'java-library'
    id 'org.springframework.boot' version '2.2.4.RELEASE'
}

sourceCompatibility = 12
targetCompatibility = 12

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter:${springBootVersion}")
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.2.RELEASE")
}

If you are creating maven based project then you can use below pom.xml file:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-cloud-stream-kafka-ordersource or spring-cloud-stream-kafka-ordercheck</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.4.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
			<version>3.0.2.RELEASE</version>
		</dependency>
	</dependencies>

    <build>
        <plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.1</version>
				<configuration>
					<source>at least 8</source>
					<target>at least 8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Remember the above build.gradle or pom.xml file can be used for your both microservices and you just need to adjust the name of the project in your settings.gradle or pom.xml file accordingly.

Microservice – Producer

In our fictional scenario, the message producer will create a stream of orders, and our processor will check if those orders should be delivered or not (undelivered).

Let’s start by producing some messages that will be sent to Kafka.

The below code is written into OrderSourceApp.java file, which is actually producing messages.

Supplier<> is a Java function data type. Because there is only one @Bean method that returns this type, Spring Cloud Stream will call this method bean. By default, it will trigger this function once every second and send the result to the default MessageChannel named supply-out-0 (which is constructed on the basis of method name followed by -out-0). But in our case as we have defined the destination topic using @InboundChannelAdapter(channel = Source.OUTPUT), so the messages will be sent to output channel.

We have put @EnableBinding(Source.class) to get the output channel binding from Source class which is already available in Spring Cloud Stream.

package com.roytuts.spring.cloud.stream.using.kafka;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;

import com.roytuts.spring.cloud.stream.using.kafka.model.Order;

@SpringBootApplication
@EnableBinding(Source.class)
public class OrderSourceApp {

	private static final Logger LOG = LoggerFactory.getLogger(OrderSourceApp.class);

	private List<String> names = Arrays.asList("Donald", "Theresa", "Vladimir", "Angela", "Emmanuel", "Shinzō",
			"Jacinda", "Kim");

	private List<String> items = Arrays.asList("Mobile", "Tab", "Desktop", "Laptop", "Head Phone", "Adapter", "Charger",
			"USB Cable", "Watch", "Clock");

	public static void main(String[] args) {
		SpringApplication.run(OrderSourceApp.class, args/* "--spring.cloud.stream.function.definition=supply" */);
	}

	@Bean
	@InboundChannelAdapter(channel = Source.OUTPUT)
	public Supplier<Order> supply() {
		return () -> {
			String rName = names.get(new Random().nextInt(names.size()));
			String rItem = items.get(new Random().nextInt(items.size()));
			Order order = new Order(UUID.randomUUID().toString(), rName, rItem);

			LOG.info("{} {} for {} for {}", order.getStatus(), order.getUuid(), order.getItem(), order.getName());

			return order;
		};
	}

}

In the above class we are putting randomly some hard-coded values as a source of data, ideally your data should come from other application or database or some persistence storage.

If we have multiple beans then we can use the property spring.cloud.stream.function.definition to explicitly declare which function bean we want to bind to the destination.

If you want to use the property spring.cloud.stream.function.definition in application.properties file, then you can put as spring.cloud.stream.function.definition=supply, where supply is the method name. If you have multiple method beans then you can use pipe (|) separator, spring.cloud.stream.function.definition=supply|foo.

If you want to use in main method then you can use as follows:

SpringApplication.run(OrderSourceApp.class, "--spring.cloud.stream.function.definition=supply");

For multiple method beans,

SpringApplication.run(OrderSourceApp.class, "--spring.cloud.stream.function.definition=supply|foo");

By default Spring Cloud Stream will send or produce message every second. If you want to use a different poller interval, we can use the spring.integration.poller.fixed-delay property in the application.properties file or as an argument to the Spring runner main method.

The dependency spring-cloud-starter-stream-kafka tells Spring that we want to send message to Kafka. Since our Kafka server is listening on localhost on the default port, we don’t need to provide any additional configuration in our application.properties file. But if you need to customize the configurations then you can read here.

The Order is a POJO class and Status is an enum that holds the constant values for order status.

package com.roytuts.spring.cloud.stream.using.kafka.model;

import com.roytuts.spring.cloud.stream.using.kafka.enums.Status;

public class Order {

	private String uuid, name, item, status;

	public Order() {
		this.setStatus(Status.PENDING.name());
	}

	public Order(String uuid, String name, String item) {
		this.uuid = uuid;
		this.name = name;
		this.item = item;
		this.setStatus(Status.PENDING.name());
	}

	public String getUuid() {
		return uuid;
	}

	public void setUuid(String uuid) {
		this.uuid = uuid;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getItem() {
		return item;
	}

	public void setItem(String item) {
		this.item = item;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		if (status.equals(Status.DELIVERED.name()) || status.equals(Status.UNDELIVERED.name())
				|| status.equals(Status.PENDING.name()) || status.equals(Status.REJECTED.name())) {
			this.status = status;
		} else {
			throw new IllegalArgumentException("Cannot set the Order's status to " + status);
		}
	}

	@Override
	public String toString() {
		return "Order [uuid=" + uuid + ", name=" + name + ", item=" + item + ", status=" + status + "]";
	}

}

And Status enum is:

package com.roytuts.spring.cloud.stream.using.kafka.enums;

public enum Status {

	DELIVERED, UNDELIVERED, PENDING, REJECTED

}

Microservice – Consumer

We have produce the events in our earlier application, now we need something to consume and process these events. For this, our order checker will observe every order and deliver or undeliver it. If delivered, an delivered message will be sent to the delivered topic otherwise, an undelivered message will be sent to the undelivered topic. You can have other systems down the line could listen for and pick up these messages for further processing.

We will see little different code here, just pointing to different topics. We see that in OrderCheckApp.java, we have the @EnableBinding(OrderProcessor.class) annotation, meaning that all of our definitions for channel bindings are found in the OrderProcessor class.

In our OrderProcessor.java file, we will see we define the MessageChannel we’re listening on is named output, matching the topic our producer writes to. Additionally, we define two other MessageChannels that we’ll be writing to, delivered and undelivered. For each of these, we also define which method to invoke when a message is received on those channels.

package com.roytuts.spring.cloud.stream.kafka.ordercheck.processor;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface OrderProcessor {

	String ORDERS_IN = "output";
	String DELIVERED_OUT = "delivered";
	String UNDELIVERED_OUT = "undelivered";

	@Input(ORDERS_IN)
	SubscribableChannel sourceOfOrders();

	@Output(DELIVERED_OUT)
	MessageChannel delivered();

	@Output(UNDELIVERED_OUT)
	MessageChannel undelivered();

}

Finally, we can see how this ties into which method is invoked if we take a look at the OrderChecker.java file. We will see we have a method checkAndSortOrders() with the @StreamListener annotation that matches our Input we defined previously:

package com.roytuts.spring.cloud.stream.kafka.ordercheck.checker;

import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import com.roytuts.spring.cloud.stream.kafka.ordercheck.enums.Status;
import com.roytuts.spring.cloud.stream.kafka.ordercheck.model.Order;
import com.roytuts.spring.cloud.stream.kafka.ordercheck.processor.OrderProcessor;

@Component
public class OrderChecker {

	public static final Logger LOG = LoggerFactory.getLogger(OrderChecker.class);

	private static final List<String> DISCARDED_ITEMS = Arrays.asList("Watch", "Clock");

	@Autowired
	private OrderProcessor processor;

	@StreamListener(OrderProcessor.ORDERS_IN)
	public void checkAndSortOrders(Order order) {
		LOG.info("{} {} for {} for {}", order.getStatus(), order.getUuid(), order.getItem(), order.getName());

		if (DISCARDED_ITEMS.contains(order.getItem())) {
			order.setStatus(Status.UNDELIVERED.name());
			processor.undelivered().send(message(order));
		} else {
			order.setStatus(Status.DELIVERED.name());
			processor.delivered().send(message(order));
		}

		LOG.info("{} {} for {} for {}", order.getStatus(), order.getUuid(), order.getItem(), order.getName());
	}

	private static final <T> Message<T> message(T val) {
		return MessageBuilder.withPayload(val).build();
	}

}

The main class to deploy our application:

package com.roytuts.spring.cloud.stream.kafka.ordercheck;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

import com.roytuts.spring.cloud.stream.kafka.ordercheck.processor.OrderProcessor;

@SpringBootApplication
@EnableBinding(OrderProcessor.class)
public class OrderCheckApp {

	public static final Logger LOG = LoggerFactory.getLogger(OrderCheckApp.class);

	public static void main(String[] args) {
		SpringApplication.run(OrderCheckApp.class, args);

		LOG.info("The Order Check Application has started...");
	}

}

The POJO class Order and enum Status are same as we have created for the earlier application.

We cannot run both application on same port 8080. Therefore we need to change the port for our order check application. So put the below configuration into application.properties file under classpath.

server.port=8081

Spring Cloud Stream provides an extremely powerful abstraction for potentially complicated messaging platforms, turning the act of producing messages into just a couple lines of code.

Testing the Application

Make sure your Apache Kafka server is running.

Now run both microservices.

The order source application produces the below messages every second:

18.106  INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING abd3c9c3-653e-4948-b477-42d8b3608281 for Mobile for Emmanuel
19.338  INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING b7e55aa9-6092-484c-8fd0-c805dc20072c for Head Phone for Kim
20.341  INFO 15768 --- [ask-scheduler-2] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING e272945a-c24f-43d6-85f8-268809e55c11 for Laptop for Angela
21.344  INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 20b86bad-908a-4ea6-a5fc-9d51b5dad3f6 for Tab for Angela
22.347  INFO 15768 --- [ask-scheduler-3] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 1db088dd-f175-4503-8cab-8bf5a2b54386 for USB Cable for Kim
23.349  INFO 15768 --- [ask-scheduler-2] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 701af728-630a-4208-9e8c-2274737b196f for Laptop for Emmanuel
24.351  INFO 15768 --- [ask-scheduler-4] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 3b4aeb05-a5ec-49bb-8429-9726d0a9bbb4 for Adapter for Kim
25.353  INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING ef15ca4a-9105-482a-9acf-c0ce9c5f5617 for Charger for Angela
26.355  INFO 15768 --- [ask-scheduler-5] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 5dff9d7f-536b-4215-bd08-4787dc4eab63 for Clock for Vladimir
27.357  INFO 15768 --- [ask-scheduler-3] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 797dea84-cbf9-4924-b35d-6584ffff805a for Laptop for Angela
28.360  INFO 15768 --- [ask-scheduler-6] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING ea2bc331-b390-46e9-a67a-43ebc4c41625 for USB Cable for Emmanuel
29.362  INFO 15768 --- [ask-scheduler-6] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 9fd5b7fc-69f9-4dd2-b670-4eacd70a3629 for Head Phone for Emmanuel
30.364  INFO 15768 --- [ask-scheduler-7] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 0484dd0e-f842-485d-bc6e-199de7ddef92 for Watch for Kim
31.366  INFO 15768 --- [ask-scheduler-4] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 9798edf4-da37-44e6-aca9-77c10f41939b for Clock for Angela
32.369  INFO 15768 --- [ask-scheduler-8] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING d47b878e-2c4c-4dea-91c4-3e7e98946c4b for Laptop for Kim
33.372  INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 09960737-86a5-4c63-b4ce-4b0bcf0a6c0f for Head Phone for Vladimir
34.375  INFO 15768 --- [ask-scheduler-1] c.r.s.c.s.using.kafka.OrderSourceApp     : PENDING 3cc5261b-50e3-447e-a528-526eb8608fcc for Charger for Emmanuel

The order check application consumes and processes these messages:

18.584  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING abd3c9c3-653e-4948-b477-42d8b3608281 for Mobile for Emmanuel
18.676  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED abd3c9c3-653e-4948-b477-42d8b3608281 for Mobile for Emmanuel
19.347  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING b7e55aa9-6092-484c-8fd0-c805dc20072c for Head Phone for Kim
19.348  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED b7e55aa9-6092-484c-8fd0-c805dc20072c for Head Phone for Kim
20.348  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING e272945a-c24f-43d6-85f8-268809e55c11 for Laptop for Angela
20.350  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED e272945a-c24f-43d6-85f8-268809e55c11 for Laptop for Angela
21.350  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 20b86bad-908a-4ea6-a5fc-9d51b5dad3f6 for Tab for Angela
21.352  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED 20b86bad-908a-4ea6-a5fc-9d51b5dad3f6 for Tab for Angela
22.353  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 1db088dd-f175-4503-8cab-8bf5a2b54386 for USB Cable for Kim
22.354  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED 1db088dd-f175-4503-8cab-8bf5a2b54386 for USB Cable for Kim
23.354  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 701af728-630a-4208-9e8c-2274737b196f for Laptop for Emmanuel
23.355  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED 701af728-630a-4208-9e8c-2274737b196f for Laptop for Emmanuel
24.357  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 3b4aeb05-a5ec-49bb-8429-9726d0a9bbb4 for Adapter for Kim
24.358  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED 3b4aeb05-a5ec-49bb-8429-9726d0a9bbb4 for Adapter for Kim
25.360  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING ef15ca4a-9105-482a-9acf-c0ce9c5f5617 for Charger for Angela
25.361  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED ef15ca4a-9105-482a-9acf-c0ce9c5f5617 for Charger for Angela
26.361  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 5dff9d7f-536b-4215-bd08-4787dc4eab63 for Clock for Vladimir
26.377  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : UNDELIVERED 5dff9d7f-536b-4215-bd08-4787dc4eab63 for Clock for Vladimir
27.363  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 797dea84-cbf9-4924-b35d-6584ffff805a for Laptop for Angela
27.363  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED 797dea84-cbf9-4924-b35d-6584ffff805a for Laptop for Angela
28.365  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING ea2bc331-b390-46e9-a67a-43ebc4c41625 for USB Cable for Emmanuel
28.366  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED ea2bc331-b390-46e9-a67a-43ebc4c41625 for USB Cable for Emmanuel
29.367  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 9fd5b7fc-69f9-4dd2-b670-4eacd70a3629 for Head Phone for Emmanuel
29.368  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : DELIVERED 9fd5b7fc-69f9-4dd2-b670-4eacd70a3629 for Head Phone for Emmanuel
30.369  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : PENDING 0484dd0e-f842-485d-bc6e-199de7ddef92 for Watch for Kim
30.370  INFO 16156 --- [container-0-C-1] c.r.s.c.s.k.o.checker.OrderChecker       : UNDELIVERED 0484dd0e-f842-485d-bc6e-199de7ddef92 for Watch for Kim

That’s all.

Source Code

Download

Thanks for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *