Introduction
In this tutorial I will show you how to produce and send message using Apache Kafka and Spring Boot. Apache Kafka is supported by providing auto-configuration of the spring-kafka project. Kafka configuration is controlled by external configuration properties in spring.kafka.*
. I will show you how to build this application using both maven and gradle build tools.
I will create a new topic on application start up. You need to make sure that ZooKeeper server and Kafka server or broker are running. You also need to make sure that a topic exists before you send or publish message to the topic.
Prerequisites
Eclipse 4.12, At least Java 8, Maven 3.6.1, Gradle 5.6, Spring Boot 2.2.2, Spring Kafka 2.3.4
How to setup and work with Apache Kafka in Windows Environment
Create Project
Let’s create a project either maven or gradle based in Eclipse IDE. The name of the project is spring-apache-kafka-producer-consumer.
If you are creating gradle based project then you can use below build.gradle script:
buildscript {
ext {
springBootVersion = '2.2.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
plugins {
id 'java-library'
id 'org.springframework.boot' version '2.2.2.RELEASE'
}
sourceCompatibility = 12
targetCompatibility = 12
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter:${springBootVersion}")
implementation 'org.springframework.kafka:spring-kafka:2.3.4.RELEASE'
}
If you are creating maven based project then you can use below pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.roytuts</groupId>
<artifactId>spring-apache-kafka-producer-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>at least 8</source>
<target>at least 8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Application Properties
We will create below application.properties file under classpath directory src/main/resources to configure the Kafka settings:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=roytutsGroup
topic.name=roytuts
Create Topic
To publish a message you must have an existing topic. We will create a topic during application start up.
To create a topic we need to add a bean of type NewTopic
. If the topic already exists then this bean is ignored.
package com.roytuts.spring.apache.kafka.producer.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
@Value("${topic.name}")
private String topic;
@Bean
public NewTopic newTopic() {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
return newTopic;
}
}
Send Message
Spring’s KafkaTemplate is auto-configured and it can be autowired directly into bean to send a message.
You will get different overloaded methods of send()
and you can choose according to your needs.
package com.roytuts.spring.apache.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Value("${topic.name}")
private String topic;
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
public void produceMessage(String msg) {
kafkaTemplate.send(topic, msg);
}
}
Receive Message
With Apache Kafka infrastructure a bean can be annotated with @KafkaListener
to create a listener endpoint on a topic.
The following example creates endpoint on topic roytuts and receives message from it:
package com.roytuts.spring.apache.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@KafkaListener(topics = "${topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeMessage(String msg) {
System.out.println("Message received: " + msg);
}
}
Create Main Class
A main class means a class is having the main method that starts the application. So in our Spring Boot application main class is enough to run the application.
package com.roytuts.spring.apache.kafka.producer.consumer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.roytuts.spring.apache.kafka.producer.MessageProducer;
@SpringBootApplication(scanBasePackages = "com.roytuts.spring.apache.kafka")
public class SpringKafkaProdConsApp implements CommandLineRunner {
@Autowired
private MessageProducer messageProducer;
private CountDownLatch countDownLatch;
public static void main(String[] args) {
SpringApplication.run(SpringKafkaProdConsApp.class, args);
}
@Override
public void run(String... args) throws Exception {
countDownLatch = new CountDownLatch(1);
countDownLatch.await(10, TimeUnit.SECONDS);
messageProducer.produceMessage("This message is sent to topic -> roytuts");
}
}
Testing the Application
Make sure your ZooKeeper server and Kafka broker are running before you run the main class.
Running the main class will produce below output in the console:
Message received: This message is sent to topic -> roytuts
Source Code
Thanks for reading.