Apache Kafka Consumer and Broker Failover in Multi-Broker Clusters

Introduction

Here in this tutorial I am going to show you how to work with Apache Kafka consumer and broker failover in multi-broker clusters. Kafka is a distributed event streaming platform that lets you read, write, store, and process events (also called records or messages in the documentation) across many machines.

Example events are payment transactions, geolocation updates from mobile phones, shipping orders, sensor measurements from IoT devices or medical equipment, and much more. These events are organized and stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.

A Kafka cluster consists of one or more servers (Kafka brokers) that can span multiple datacenters or cloud regions. A Kafka cluster is highly scalable and fault-tolerant: if any of its servers fails, the other servers will take over their work to ensure continuous operations without any data loss. Producers push records into Kafka topics within the broker. A consumer pulls records off a Kafka topic.

kafka consumer failover and kafka broker failver in multi-broker clusters

Failover is the ability to switch automatically and seamlessly to a reliable backup or secondary system. In other words, a procedure by which a system automatically transfers control to a duplicate system when it detects a fault or failure. When a primary system fails, either a standby operational mode or redundancy should achieve failover and lessen or eliminate negative impact on end users of the system.

By default, replication factor is set to 1. The recommended replication-factor for production environments is 3 which means that 3 brokers are required and there will always be three copies of your data. So a replication factor is the number of copies of data over multiple brokers.

Topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time. When a new event is published to a topic, it is actually appended to one of the topic’s partitions. Events with the same event key (e.g., a customer or vehicle ID) are written to the same partition, and Kafka guarantees that any consumer of a given topic-partition will always read that partition’s events in exactly the same order as they were written.

Topic partitions can be replicated across multiple nodes for failover. The topic should have a replication factor greater than 1 (2, or 3).

Each partition in kafka has a leader server and zero or more follower servers. Leaders handle all read and write requests for a partition. Followers replicate leaders and take over if the leader dies.

Why do you need failover?

Think of a situation when a hardware failure occurs, or a software failure occurs, or a power outage happens, or due to the denial of service attack or some other events, one datacenter (DC) with a kafka cluster completely fails; yet kafka continues running in another datacenter, because it already has a copy of the data from the original datacenter, replicated to and from the same topic names.

So client applications switch from the failed cluster to the running cluster and automatically resume data consumption from the new datacenter (DC) based on where it left off in the original datacenter. Therefore the business has now the minimal downtime and data loss resulting from the disaster, and continues to run its mission critical applications.

About the Example

The setup of multiple datacenters is out of scope in this tutorial and here I am going to demonstrate consumer failover and broker failover in multi-broker cluster. I will also show how kafka acts as publish/subscribe with any groups. However I am going to use the same group for the consumers and kafka will share the load of the messages among the same consumer group.

Prerequisites

Apache Kafka 2.13_2.8.0, Windows 10 64 bit OS

You can check how to setup kafka in Windows

Kafka server.properties files

As I am going to start multiple kafka servers (brokers), so I need to create multiple server.properties files. I am going to create three server.properties files for this example. The server.properties file is generally available under kafka_2.13-2.8.0\config folder. I am going to copy the existing server.properties file and creating the new two files – server-1.properties and server-2.properties.

Next I am going to put unique broker id, port and log location in each of the properties file. I am not going to change anything in the server.properties file and leaving with the default values.

server-1.properties

broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs-1

server-2.properties

broker.id=2
listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kafka-logs-2

Start Zookeeper Server

Execute the following batch file to run the zookeeper server. Your zookeeper server will start at http://localhost:2181. Make sure you run the following command on Kafka’s root or home directory.

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Start Kafka Server

Here I will start three kafka servers using the following commands. Open each command in separate command line tool (cmd prompt). Make sure you run the following command on Kafka’s root or home directory.

bin\windows\kafka-server-start.bat config\server.properties
bin\windows\kafka-server-start.bat config\server-1.properties
bin\windows\kafka-server-start.bat config\server-2.properties

Replicated Topic

Now I will create a new topic with replication factor of 3. Make sure you run the following command on Kafka’s root or home directory.

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 15 --topic my-replicated-topic

Once topic created, you will see the following message that your topic is created:

Created topic my-replicated-topic.

If you need to alter the number of partitions then you can use the following command. I am adding 15 partitions to the topic:

bin\windows\kafka-topics.bat --alter --zookeeper localhost:2181 --topic my-replicated-topic --partitions 20

If you alter the number of partitions then you will get an warning message as shown below:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

You can also see the description of the topic you have created using the following command:

bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic

Once you execute the above command in command line tool then you will see the following output:

Topic: my-replicated-topic      TopicId: QG4rZZwkRJal2OUxbxHYNQ PartitionCount: 15      ReplicationFactor: 3    Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: my-replicated-topic      Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: my-replicated-topic      Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: my-replicated-topic      Partition: 3    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: my-replicated-topic      Partition: 4    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: my-replicated-topic      Partition: 5    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: my-replicated-topic      Partition: 6    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: my-replicated-topic      Partition: 7    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: my-replicated-topic      Partition: 8    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: my-replicated-topic      Partition: 9    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: my-replicated-topic      Partition: 10   Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: my-replicated-topic      Partition: 11   Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: my-replicated-topic      Partition: 12   Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: my-replicated-topic      Partition: 13   Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: my-replicated-topic      Partition: 14   Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

In the above output, the first line gives a summary of all the partitions, each additional line gives information about one partition. Since I have created 15 partitions for this topic there are 15 lines.

  • “leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • “replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

Start Kafka Consumer

Next I am going to start kafka consumers that will be using replicated topic. Use the following command to start the consumers in separate command line tool (cmd prompt).

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093 --topic my-replicated-topic --from-beginning

Notice I have passed a list of kafka servers to parameter --bootstrap-server. Though only one server is needed but multiple servers will ensure that there is no outage.

Start Kafka Producer

I am passing a list of servers to the --broker-list parameter.

bin\windows\kafka-console-producer.bat --broker-list localhost:9093,localhost:9094 --topic my-replicated-topic

Now on producer console you will see a cursor is blinking to get something from end users. Now if you type a message on producer console and press ENTER key from your keyboard, for example:

kafka multi-brokers cluster

Then you will see the same message on the consumer console:

kafka failover in multi-brokers cluster

You can type many more messages in producer console and you can see them in the consumer console.

If you start more consumer consoles then you will see all messages are consumed by all consumers. For example, if you start another consumer console as given below:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093 --topic my-replicated-topic --from-beginning

Then you will see the below message is there in the new consumer console:

Hey, How are you?

Messages are sent to all consumers because they are in different groups.

Therefore in the above example you have seen that message was delivered to each of the consumers being in different group. So each consumer group subscribed to a topic and maintained its own offset per partition in that topic.

Change Consumer Group

Now change the consumer group to be their own consumer group. First stop producer and consumers but leave kafka and zookeeper running.

Now modify the command to include --consumer-property group.id=roygroup to run consumers that use replicated topic. Here I am adding the consumer group and all consumers will be in the same group.

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093 --topic my-replicated-topic --from-beginning --consumer-property group.id=roygroup

The consumers will share the messages because each consumer in the consumer group will get its share of partitions. The --consumer-property group.id=roygroup will put every consumer into the roygroup consumer group.

Now run the producer console and two or more times for consumer console using the above modified command.

Now I am sending the following messages from producer console:

>Hey, how are you?
>Welcome to Roy Tutorials
>Message 1
>Message 2
>Message 3
>Message 4
>M5
>M6
>M7
>M8
>M9
>M10
>

I started three consumer consoles and you will see that the above messages are distributed among consumers in the following way:

Consumer Console 1

Welcome to Roy Tutorials
Message 2
Message 3
M5
M6
M7

Consumer Console 2

M10

Consumer Console 3

Hey, how are you?
Message 1
Message 4
M8
M9

Therefore the consumers are in the same group so they get shares of the messages.

Kafka Consumer Failover

Now I am going to tell you how consumer failover happens. So when one of the consumers is down then the messages should be distributed among the currently running consumers. So I am going to kill the second consumer (in the above that got only message m10) and will send the messages and will see how those messages get distributed among two consumers.

Now I am sending the following messages in producer console:

>M1
>M2
>M3
>M4
>M5
>M6
>M7
>M8
>

The first consumer got the following messages:

M5
M6

The third (now second after killing the second one) consumer got the following messages:

M1
M2
M3
M4
M7
M8

Therefore even after killing one consumer the messages got distributed among the live consumers.

Kafka Broker Failover

Now you will see how to check kafka broker failover. To test fault tolerance I will kill the broker 1 because broker 1 was acting as a leader.

To kill you can do any one of the two methods – press CTRL+C on the broker window or check the process id by the following command and kill the process id.

wmic process where "caption = 'java.exe' and commandline like '%server.properties%'" get processid

The above command will give you the process id as follows:

ProcessId
12436

Now you can run the following command to kill the process:

taskkill /F /PID 12436

Your process will get killed and you will get success message:

SUCCESS: The process with PID 12436 has been terminated.

So the first broker will be stopped or terminated.

Now when you describe your topic you will see that kafka distributes the leadership among 2nd and 3rd brokers:

bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic

The above command will give you the following output:

Topic: my-replicated-topic      TopicId: QG4rZZwkRJal2OUxbxHYNQ PartitionCount: 15      ReplicationFactor: 3    Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2
        Topic: my-replicated-topic      Partition: 1    Leader: 1       Replicas: 0,1,2 Isr: 1,2
        Topic: my-replicated-topic      Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2
        Topic: my-replicated-topic      Partition: 3    Leader: 2       Replicas: 2,1,0 Isr: 1,2
        Topic: my-replicated-topic      Partition: 4    Leader: 2       Replicas: 0,2,1 Isr: 1,2
        Topic: my-replicated-topic      Partition: 5    Leader: 1       Replicas: 1,0,2 Isr: 1,2
        Topic: my-replicated-topic      Partition: 6    Leader: 2       Replicas: 2,0,1 Isr: 1,2
        Topic: my-replicated-topic      Partition: 7    Leader: 1       Replicas: 0,1,2 Isr: 1,2
        Topic: my-replicated-topic      Partition: 8    Leader: 1       Replicas: 1,2,0 Isr: 1,2
        Topic: my-replicated-topic      Partition: 9    Leader: 2       Replicas: 2,1,0 Isr: 1,2
        Topic: my-replicated-topic      Partition: 10   Leader: 2       Replicas: 0,2,1 Isr: 1,2
        Topic: my-replicated-topic      Partition: 11   Leader: 1       Replicas: 1,0,2 Isr: 1,2
        Topic: my-replicated-topic      Partition: 12   Leader: 2       Replicas: 2,0,1 Isr: 1,2
        Topic: my-replicated-topic      Partition: 13   Leader: 1       Replicas: 0,1,2 Isr: 1,2
        Topic: my-replicated-topic      Partition: 14   Leader: 1       Replicas: 1,2,0 Isr: 1,2

Now if you send messages in producer console then you will see that messages are being consumed by consumers.

For example I have sent messages M1 and M2 from producer console and these messages were consumed by consumer 1 and consumer 2.

Producer Console

>M1
>M2
>

Consumer Console 1

M1

Consumer Console 2

M2

You will also see the error message in the consumer consoles similar to the following:

WARN [Consumer clientId=consumer-roygroup-1, groupId=roygroup] Connection to node 0 (/9.160.183.5:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

So it means that consumer is trying to connect to broker 1 (9092) but it cannot connect because I have terminated it. Even the similar error you will find in other two broker’s console.

So kafka broker failover also works.

Therefore you have seen consumer failover and kafka broker failover in this example. In this example you have also seen how I have setup multi-broker cluster.

That’s all about how kafka consumer and kafka broker failover work in kafka multi-broker clusters.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *