Spring Boot Functional Reactive Programming Example

Reactive Programming

This tutorial will show you an example on Spring Boot Functional Reactive Programming (FRP).

Related Posts:

What are Reactive and Functional Reactive Programming?

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

In other words, Reactive Programming is a style of micro-architecture involving intelligent routing and consumption of events, all combining to change behaviour.

Simply to say, it is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale vertically (i.e. within the JVM) rather than horizontally (i.e. through clustering).

Functional Reactive Programming (FRP) is a programming paradigm for reactive programming (asynchronous dataflow programming) using the building blocks of functional programming.

Use Cases Of Reactive Programming?

1. External service calls backend services and most of the backend services are RESTful APIs and they operate on http protocol. So the underlying protocol is blocking and synchronous.

Let’s say implementations of such services often involve calling of other services and then more services depending on the result of the subsequent service calls.

In such situations with so much IO going on, you need to wait for one call to complete before you send next request and your client may get “disconnected/timed out” for some reasons before you managed to assemble the reply for your client.

FRP offers the promise of “composability” of the logic to optimize the complex orchestrations of dependencies between service calls. So it becomes easier to write for the developer of the calling service.

2. Highly concurrent message consumers in message processing system are common enterprise use cases.

When you have to process millions of messages per second and performance matters, then you should pay attention to use Reactive patterns. Reactive patterns fit naturally with faster message processing since an event gets easily translated into a message.

3. As long as developers accept extra layer of abstraction, they can easily forget about the code that they are calling is synchronous or asynchronous. Since it costs precious brain cells to deal with asynchronous programming. Reactive Programming is not the only the solution but FRP tools are also useful to this issue.

Reactive Programming in Java

Java itself does not support Reactive Programming until version 9. However, Java being the powerhouse of enterprise application development, there are a lot of Reactive libraries that provide the Reactive programming on top of JDK. Such libraries are Reactive Streams, RxJava, Reactor, Spring Framework 5.0, RatPack  or may be more.

What is driving to the rise of Reactive programming?

Well, it’s not all just about the technology. The driver is efficient resource utilization, or in other words, spending less money on servers and data centres.

The promise of Reactive is that you can do more with less, specifically you can process higher loads with fewer threads. This is where the intersection of Reactive and non-blocking, asynchronous I/O comes to the foreground.

For the right problem, the effects are elegant. For the wrong problem, the effects might go into reverse (you actually make things worse).

Reactive Programming using Spring Framework

The Spring framework uses Reactor internally for its own reactive support. Reactor is a Reactive Streams implementation that further extends the basic Reactive Streams Publisher contract with the Flux and Mono composable API types to provide declarative operations on data sequences of 0..N and 0..1.

Spring Framework 5 includes a new spring-webflux module. The module contains support for reactive HTTP and WebSocket clients as well as for reactive server web applications including REST, HTML browser, and WebSocket style interactions.

On the server-side WebFlux supports 2 distinct programming models:

  • Annotation-based with @Controller and the other annotations supported also with Spring MVC
  • Functional, Java 8 lambda style routing and handling

Both programming models are executed on the same reactive foundation that adapts non-blocking HTTP runtimes to the Reactive Streams API. The diagram below shows the server-side stack including traditional, Servlet-based Spring MVC on the left from the spring-webmvc module and also the reactive stack on the right from the spring-webflux module.

spring boot reactive

You will see here Functional, Java 8 lambda style routing and handling Reactive Programming example. If you want to implement annotation based programming model then you can check here at WebFlux Framework.

I will also use here Spring Boot Framework to create the example. The example will perform CRUD(Create, Read, Update and Delete) operations using REST APIs.

Enough talking about Reactive Programming… Now let’s move on to the Spring Boot Functional Reactive programming example.

Prerequisites

Java 8/12/19, Spring Boot Webflux 2.2.6/3.1.3, Maven 3.6.3/3.8.5

Setup Project

The following pom.xml file can be used for your maven based project:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-boot-reactive</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>19</maven.compiler.source>
		<maven.compiler.target>19</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.3</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Create Model

Create a model class that simply has few attributes.

public class WebSite {

	private Integer id;
	private String url;
	private String title;

	//getters and setters

}

Create Repository

Create below repository class that is a source of data on which you want to do apply some business or processing logic.

For this example, I will use this class as a source of data. Ideally your source of data should be any database or any external service.

Here I have initially provided my two websites in the repository that will help you to test the reactive application.

@Repository
public class WebSiteRepository {

	private static final Map<Integer, WebSite> WEBSITES = new HashMap<>();
	static {
		WebSite webSite1 = new WebSite();
		webSite1.setId(1);
		webSite1.setTitle("Roy Tutorials");
		webSite1.setUrl("https://roytuts.com");
		WEBSITES.put(1, webSite1);

		WebSite webSite2 = new WebSite();
		webSite2.setId(2);
		webSite2.setTitle("JEE Tutorials");
		webSite2.setUrl("https://roytuts.com");
		WEBSITES.put(2, webSite2);
	}
	private static int ID_COUNTER = 3;

	public Flux<WebSite> findAll() {
		return Flux.fromIterable(WEBSITES.values());
	}

	public Mono<WebSite> findById(Integer id) {
		return Mono.just(WEBSITES.get(id));
	}

	public Mono<Void> delete(Integer id) {
		WEBSITES.remove(id);
		return Mono.empty();
	}

	public Mono<Void> add(Mono<WebSite> webSite) {
		return webSite.doOnNext(wb -> {
			Integer id = ID_COUNTER++;
			wb.setId(id);
			WEBSITES.put(id, wb);
		}).thenEmpty(Mono.empty());
	}

	public Mono<Void> update(Mono<WebSite> webSite) {
		return webSite.doOnNext(wb -> {
			WEBSITES.put(wb.getId(), wb);
		}).thenEmpty(Mono.empty());
	}

}

In the above example you can find details here on Flux and Mono.

Now you will see two important functions of Reactive Streams – Handler Function and Router Function.

Handler Functions

Writing handler functions as lambdas’ is convenient, but perhaps lacks in readability and becomes less maintainable when dealing with multiple functions. Therefore, it is recommended to group related handler functions into a handler or controller class.

For example, here is a class that builds a reactive WebSite repository:

@Service
public class WebSiteHandler {

	@Autowired
	private WebSiteRepository webSiteRepository;

	public Mono<ServerResponse> getAllWebSites(ServerRequest serverRequest) {
		Flux<WebSite> webSites = webSiteRepository.findAll();
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(webSites, WebSite.class);
	}

	public Mono<ServerResponse> addWebSite(ServerRequest serverRequest) {
		Mono<WebSite> webSite = serverRequest.bodyToMono(WebSite.class);
		return ServerResponse.ok().build(webSiteRepository.add(webSite));
	}

	public Mono<ServerResponse> updateWebSite(ServerRequest serverRequest) {
		Mono<WebSite> webSite = serverRequest.bodyToMono(WebSite.class);
		return ServerResponse.ok().build(webSiteRepository.update(webSite));
	}

	public Mono<ServerResponse> deleteWebSite(ServerRequest serverRequest) {
		Integer webSiteId = Integer.valueOf(serverRequest.pathVariable("id"));
		webSiteRepository.delete(webSiteId);
		return ServerResponse.ok().build(Mono.empty());
	}

	public Mono<ServerResponse> getWebSite(ServerRequest serverRequest) {
		Integer webSiteId = Integer.valueOf(serverRequest.pathVariable("id"));
		Mono<ServerResponse> notFound = ServerResponse.notFound().build();
		Mono<WebSite> webSiteMono = webSiteRepository.findById(webSiteId);
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(webSiteMono, WebSite.class)
				.switchIfEmpty(notFound);
	}

}

In the above class, incoming HTTP requests are handled by a HandlerFunction, which is essentially a function that takes a ServerRequest and returns a Mono<ServerResponse>.

ServerRequest and ServerResponse are immutable interfaces and both are fully reactive by building on top of Reactor: the request expose the body as Flux or Mono; the response accepts any Reactive Streams Publisher as body.

ServerRequest gives access to various HTTP request elements: the method, URI, query parameters, and — through the separate ServerRequest.Headers interface — the headers.

Similarly, ServerResponse provides access to the HTTP response. Since it is immutable, you create a ServerResponse with a builder. The builder allows you to set the response status, add response headers, and provide a body.

getAllWebSites() is a handler function that returns all WebSite objects found in the repository as JSON.

getWebSite() is a handler function that returns single WebSite object for a given id found in the repository as JSON.

addWebSite() and updateWebSite() are handler functions that store a new WebSite and update existing WebSite contained in the request body, respectively.

Note that WebSiteRepository.add(Mono<WebSite>) returns Mono<Void>: an empty Mono that emits a completion signal when the website has been read from the request and stored. So we use the build(Publisher<Void>) method to send a response when that completion signal is received, i.e. when the WebSite has been added.

getWebSite() is a handler function that returns a single website, identified via the path variable id. We retrieve that WebSite via the repository, and create a JSON response if it is found. If it is not found, I will use switchIfEmpty(Mono<T>) to return a 404 Not Found response.

Router Functions

Incoming requests are routed to handler functions with a RouterFunction, which is a function that takes a ServerRequest, and returns a Mono<HandlerFunction>. If a request matches a particular route, a handler function is returned; otherwise it returns an empty Mono.

Given the WebSiteHandler I showed above, I can now define a router function that routes to the respective handler functions.

I will use method-references to refer to the handler functions:

@Configuration
public class WebSiteRouter {

	@Bean
	public RouterFunction<ServerResponse> route(WebSiteHandler webSiteHandler) {
		RouterFunction<ServerResponse> webSiteRoute = RouterFunctions
				.route(RequestPredicates.GET("/website").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
						webSiteHandler::getAllWebSites)
				.andRoute(RequestPredicates.GET("/website/{id}")
						.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), webSiteHandler::getWebSite)
				.andRoute(
						RequestPredicates.POST("/website").and(RequestPredicates.accept(MediaType.APPLICATION_JSON))
								.and(RequestPredicates.contentType(MediaType.APPLICATION_JSON)),
						webSiteHandler::addWebSite)
				.andRoute(
						RequestPredicates.PUT("/website").and(RequestPredicates.accept(MediaType.APPLICATION_JSON))
								.and(RequestPredicates.contentType(MediaType.APPLICATION_JSON)),
						webSiteHandler::updateWebSite)
				.andRoute(
						RequestPredicates.DELETE("/website/{id}"),
						webSiteHandler::deleteWebSite);
		return webSiteRoute;
	}

}

Besides router functions, you can also compose request predicates, by calling RequestPredicate.and(RequestPredicate) or RequestPredicate.or(RequestPredicate).

These work as expected: for and the resulting predicate matches if both given predicates match; or matches if either predicate does. Most of the predicates found in RequestPredicates are compositions.

Notice how I have used lambda expression to functional interfaces in the above Router Functions, that’s why the title name is given as Spring Boot Functional Reactive Programming example.

Spring Boot Main Class

I need to deploy the application into a server. So creating below main class will simply treated as a standalone application. Along the way, you use Reactive Spring’s support for embedding the Netty server as the HTTP runtime, instead of deploying to an external instance.

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

}

Reactive programming is just about finding ways to stream data and events through a pipeline, rather than dealing discreetly with each event, change, or response. You can develop your application not in a set of interconnected callback functions and event listeners, but a logical flow from events to actions, which themselves flow into other things.

Testing the Reactive Program

Now you can test the application using any client as per your choice. You can use curl command to access the services. You could create Java we client to access the services or even you can create your own client using ReactJS, Angular(JS) or any other technologies to consume the services.

Here is the Postman tool used to test the application program.

To get the list of websites:

spring reactive

Create new website information, you will get 200 status:

spring reactive

Fetch updated list of websites:

spring reactive

Update existing website information:

spring reactive

Fetch the updated list of websites:

spring reactive

Delete existing website information:

spring reactive

Get the single website information:

spring reactive

Here I have used REST client to show you how to test the application in the below video.

The video does not include the testing for delete operation, so it’s left an exercise for you.

HTTP Method – DELETE

URL – http://localhost:8080/website/3

Headers – Content-Type : application/json

Response – 200

Congratulations! You have developed Spring Boot Functional Reactive Programming example.

You may also like to read Spring Boot + Angular Functional Reactive Programming

Source Code

Download

2 thoughts on “Spring Boot Functional Reactive Programming Example

Leave a Reply

Your email address will not be published. Required fields are marked *