Erlang -how to develop a scalable Kafka consumer

Apache Kafka is a Java based, open-source event streaming platform which is used to design scalable , high-throughput , fault -tolerant systems to handle real-time data feeds. If you are unfamiliar with Kafka, you can get a complete idea by referring the official kafka guide

Scaling the consumer with Consumer Group concept

In Kafka, data is stored in topics. You can create topics with several partitions. Once a producer writes data to a Kafka topic, data is stored in one of the partitions in round-robin manner. ( Or the application can select specific partition). If there is one consumer, it can read all messages from all the partitions in that topic. See below diagram for a better understanding.

One consumer in consumer group

When adding more consumers to consumer group, partitions will be re-assigned to consumers in order to share the load, as shown in below diagrams.

Two consumers in consumer group
Three consumers in consumer group

What will happen if another consumer is added to consumer group ?. Each partition is connected to one partition only. So, First three consumers will be connected to three partitions and remaining consumer will be idle, as shown in below diagram.

Four consumers in consumer group

So, when designing a solution we need to consider the number of partitions in a topic, since the scalability of the consumer app depends on that.

So let’s now move to the Erlang- Kafka consumer developing part.

Prerequisites

Code Development

Creating the Project and Adding Erlang-Kafka library

$ rebar3 new release demoApp1
===> Writing apps/demoApp1/src/demoApp1_app.erl
===> Writing apps/demoApp1/src/demoApp1_sup.erl
===> Writing apps/demoApp1/src/demoApp1.app.src
===> Writing rebar.config
===> Writing config/sys.config
===> Writing config/vm.args
===> Writing .gitignore
===> Writing LICENSE
===> Writing README.md

After that we will add Brod — Apache Kafka Client for Erlang to the project. It is an Erlang implementation of Apache kafka protocol. It supports for both producers and consumers. We can add relevant configurations to rebar.config and demoApp1.app.src files.

rebar.config
demoApp1.app.src

Adding a Kafka Client to the app

kafka_client_svr.erl

Now we can use client1 inside the application to handle producers and consumers.

Consumer logic development

  • in init() method , call brod:start_link_group_subscriber function. You have to use the kafka client (client1) which we added earlier as a parameter. Also another important parameter is GroupId ; this is the unique id shared by all the consumers in a consumer group. It is used by Kafka side to identify that all the consumers all belongs to same group.
  • Add a callback function to process the received kafka message. Once we connect to kafka topic in init function, brod_group_subscriber will listen to that topic continuously and once message is arrived it will call callback function with that data.

Please refer the below implementation.

kafka_consumer_svr.erl

Adding client and consumer servers to supervisor

demoApp1_sup.erl

You can find the complete code in below link.

Testing

Create Kafka topic

root@kasun-VirtualBox:~# /home/kafka/bin/kafka-topics.sh --create --topic TestTopic  --replication-factor 1   --partitions 2  --zookeeper localhost:2181
Created topic TestTopic.

Check with one consumer

Go to demoApp1 location and start the application using rebar3 shell command as bellow.

root@kasun-VirtualBox:/home/kasun/demoApp1# rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling demoApp1
Erlang/OTP 22 [erts-10.6.4] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1]
Eshell V10.6.4 (abort with ^G)
1> Starting Kafka Client ok
===> Booted snappyer
===> Booted crc32cer
===> Booted kafka_protocol
===> Booted supervisor3
===> Booted brod
===> Booted demoApp1
===> Booted sasl
=INFO REPORT==== 21-Mar-2021::20:59:29.039196 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
elected=true
=INFO REPORT==== 21-Mar-2021::20:59:29.170970 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
assignments received:
TestTopic:
partition=0 begin_offset=undefined
partition=1 begin_offset=undefined
=INFO REPORT==== 21-Mar-2021::20:59:29.177380 ===
client client1 connected to kasun-VirtualBox:9092

Now write to the topic as bellow.

root@kasun-VirtualBox:~# echo "Hello, World" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
root@kasun-VirtualBox:~#

When we check demoApp1 shell, we can see that data has been read by the consumer.

1> Kafka message {kafka_message,0,<<>>,<<"Hello, World 1">>,create,1616344333665,
[]}
Kafka message {kafka_message,1,<<>>,<<"Hello, World 2">>,create,1616344336740,
[]}

Check with two consumers

Now:

  • demoApp1 is listening to: partition 0
  • demoApp2 is listening to: partition 1

demoApp1

=INFO REPORT==== 21-Mar-2021::21:24:04.086674 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
re-joining group, reason:rebalance_in_progress
=INFO REPORT==== 21-Mar-2021::21:24:04.091575 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
elected=true
=INFO REPORT==== 21-Mar-2021::21:24:04.104967 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
assignments received:
TestTopic:
partition=0 begin_offset=1

demoApp2

=INFO REPORT==== 21-Mar-2021::21:24:04.093209 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
elected=false
=INFO REPORT==== 21-Mar-2021::21:24:04.105420 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
assignments received:
TestTopic:
partition=1 begin_offset=1
=INFO REPORT==== 21-Mar-2021::21:24:04.117680 ===
client client1 connected to kasun-VirtualBox:9092

Now let’s write some data to kafka topic and check each consumer output.

root@kasun-VirtualBox:~# echo "Hello, World 1" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
root@kasun-VirtualBox:~# echo "Hello, World 2" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null

When checking demoApp1 and demoApp2 shell outputs we can see that data has been distributed to two partitions and has been read by each consumer assigned to the related partition.

demoApp1

1> 
1> Kafka message {kafka_message,1,<<>>,<<"Hello, World 2">>,create,1616343773113,
[]}

demoApp2

1> 
1> Kafka message {kafka_message,1,<<>>,<<"Hello, World 1">>,create,1616343773113,
[]}

Conclusion

So , I hope you may get some basic understanding about how to develop a scalable Kafka consumer in Erlang. Waiting for your valuable comments regarding the article.

Thanks for the reading and happy coding 😍

Software Engineer |Studied Computer Science & Engineering at University Of Moratuwa, Sri Lanka | Mobile and Web Full Stack Developer | Erlang Developer