How to produce and consume message using Apache Kafka and Spring Boot

Introduction

In this tutorial I will show you how to produce and send message using Apache Kafka and Spring Boot. Apache Kafka is supported by providing auto-configuration of the spring-kafka project. Kafka configuration is controlled by external configuration properties in spring.kafka.*. I will show you how to build this application using both maven and gradle build tools.

I will create a new topic on application start up. You need to make sure that ZooKeeper server and Kafka server or broker are running. You also need to make sure that a topic exists before you send or publish message to the topic.

Prerequisites

Eclipse 4.12, At least Java 8, Maven 3.6.1, Gradle 5.6, Spring Boot 2.2.2, Spring Kafka 2.3.4

How to setup and work with Apache Kafka in Windows Environment

Create Project

Let’s create a project either maven or gradle based in Eclipse IDE. The name of the project is spring-apache-kafka-producer-consumer.

If you are creating gradle based project then you can use below build.gradle script:

buildscript {
	ext {
		springBootVersion = '2.2.2.RELEASE'
	}
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

plugins {
    id 'java-library'
    id 'org.springframework.boot' version '2.2.2.RELEASE'
}

sourceCompatibility = 12
targetCompatibility = 12

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter:${springBootVersion}")
    implementation 'org.springframework.kafka:spring-kafka:2.3.4.RELEASE'
}

If you are creating maven based project then you can use below pom.xml file:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-apache-kafka-producer-consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.2.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.3.4.RELEASE</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.1</version>
				<configuration>
					<source>at least 8</source>
					<target>at least 8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

Application Properties

We will create below application.properties file under classpath directory src/main/resources to configure the Kafka settings:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=roytutsGroup

topic.name=roytuts

Create Topic

To publish a message you must have an existing topic. We will create a topic during application start up.

To create a topic we need to add a bean of type NewTopic. If the topic already exists then this bean is ignored.

package com.roytuts.spring.apache.kafka.producer.consumer.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {

	@Value("${topic.name}")
	private String topic;

	@Bean
	public NewTopic newTopic() {
		NewTopic newTopic = new NewTopic(topic, 1, (short) 1);

		return newTopic;
	}

}

Send Message

Spring’s KafkaTemplate is auto-configured and it can be autowired directly into bean to send a message.

You will get different overloaded methods of send() and you can choose according to your needs.

package com.roytuts.spring.apache.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

	@Value("${topic.name}")
	private String topic;

	@Autowired
	private KafkaTemplate<Integer, String> kafkaTemplate;

	public void produceMessage(String msg) {
		kafkaTemplate.send(topic, msg);
	}

}

Receive Message

With Apache Kafka infrastructure a bean can be annotated with @KafkaListener to create a listener endpoint on a topic.

The following example creates endpoint on topic roytuts and receives message from it:

package com.roytuts.spring.apache.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

	@KafkaListener(topics = "${topic.name}", groupId = "${spring.kafka.consumer.group-id}")
	public void consumeMessage(String msg) {
		System.out.println("Message received: " + msg);
	}

}

Create Main Class

A main class means a class is having the main method that starts the application. So in our Spring Boot application main class is enough to run the application.

package com.roytuts.spring.apache.kafka.producer.consumer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.apache.kafka.producer.MessageProducer;

@SpringBootApplication(scanBasePackages = "com.roytuts.spring.apache.kafka")
public class SpringKafkaProdConsApp implements CommandLineRunner {

	@Autowired
	private MessageProducer messageProducer;

	private CountDownLatch countDownLatch;

	public static void main(String[] args) {
		SpringApplication.run(SpringKafkaProdConsApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		countDownLatch = new CountDownLatch(1);
		countDownLatch.await(10, TimeUnit.SECONDS);
		messageProducer.produceMessage("This message is sent to topic -> roytuts");
	}

}

Testing the Application

Make sure your ZooKeeper server and Kafka broker are running before you run the main class.

Running the main class will produce below output in the console:

Message received: This message is sent to topic -> roytuts

Source Code

Download

Thanks for reading.

Leave a Reply

Your email address will not be published. Required fields are marked *