Erlang -how to develop a scalable Kafka consumer

Apache Kafka is a Java based, open-source event streaming platform which is used to design scalable , high-throughput , fault -tolerant systems to handle real-time data feeds. If you are unfamiliar with Kafka, you can get a complete idea by referring the official kafka guide

Scaling the consumer with Consumer Group concept

Scaling the consumer app is really simple with the Kafka Consumer Group concept. Load balancing the data with multiple consumers is done by Kafka itself, so nothing to worry from application side. Let’s see how it happens.

One consumer in consumer group
Two consumers in consumer group
Three consumers in consumer group
Four consumers in consumer group

Prerequisites

You need to setup Erlang ,Kafka and Rebar3 in your developing environment. I’m not going describe installation steps in detail since it will vary depend on your environment. I’ll mention official installation guides for your reference.

Code Development

Creating the Project and Adding Erlang-Kafka library

You can create the Erlang project using rebar3 create release command.

$ rebar3 new release demoApp1
===> Writing apps/demoApp1/src/demoApp1_app.erl
===> Writing apps/demoApp1/src/demoApp1_sup.erl
===> Writing apps/demoApp1/src/demoApp1.app.src
===> Writing rebar.config
===> Writing config/sys.config
===> Writing config/vm.args
===> Writing .gitignore
===> Writing LICENSE
===> Writing README.md
rebar.config
demoApp1.app.src

Adding a Kafka Client to the app

brod/Kafka library has a special implementation called brod_client. It is a gen_server which has the primary responsibility of establish tcp sockets to kafka brokers and maintain those connections. Once a client is started, it can be used in anywhere in the application for handling data with producers and consumers. See below code for the client implementation.

kafka_client_svr.erl

Consumer logic development

Need to mention two main points in consumer logic development

  • Add a callback function to process the received kafka message. Once we connect to kafka topic in init function, brod_group_subscriber will listen to that topic continuously and once message is arrived it will call callback function with that data.
kafka_consumer_svr.erl

Adding client and consumer servers to supervisor

As the final step of the code development , let’s add both client and consumer servers to application supervisor.

demoApp1_sup.erl

Testing

Here comes the most awaiting and interesting part. Lets check how scalability is handled in our solution.

Create Kafka topic

First lets create our Kafka topic ; TestTopic. For testing scalability of consumer let’s create it with two partitions.

root@kasun-VirtualBox:~# /home/kafka/bin/kafka-topics.sh --create --topic TestTopic  --replication-factor 1   --partitions 2  --zookeeper localhost:2181
Created topic TestTopic.

Check with one consumer

Let’s start one consumer and check how data written to the TestTopic is read by that consumer.

root@kasun-VirtualBox:/home/kasun/demoApp1# rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling demoApp1
Erlang/OTP 22 [erts-10.6.4] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1]
Eshell V10.6.4 (abort with ^G)
1> Starting Kafka Client ok
===> Booted snappyer
===> Booted crc32cer
===> Booted kafka_protocol
===> Booted supervisor3
===> Booted brod
===> Booted demoApp1
===> Booted sasl
=INFO REPORT==== 21-Mar-2021::20:59:29.039196 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
elected=true
=INFO REPORT==== 21-Mar-2021::20:59:29.170970 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
assignments received:
TestTopic:
partition=0 begin_offset=undefined
partition=1 begin_offset=undefined
=INFO REPORT==== 21-Mar-2021::20:59:29.177380 ===
client client1 connected to kasun-VirtualBox:9092
root@kasun-VirtualBox:~# echo "Hello, World" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
root@kasun-VirtualBox:~#
1> Kafka message {kafka_message,0,<<>>,<<"Hello, World 1">>,create,1616344333665,
[]}
Kafka message {kafka_message,1,<<>>,<<"Hello, World 2">>,create,1616344336740,
[]}

Check with two consumers

Now let’s copy demoApp1 code to another location; demoApp2 and start it. As you can see once after starting demoApp2, partition reassignment has been happened.

  • demoApp2 is listening to: partition 1
=INFO REPORT==== 21-Mar-2021::21:24:04.086674 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
re-joining group, reason:rebalance_in_progress
=INFO REPORT==== 21-Mar-2021::21:24:04.091575 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
elected=true
=INFO REPORT==== 21-Mar-2021::21:24:04.104967 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
assignments received:
TestTopic:
partition=0 begin_offset=1
=INFO REPORT==== 21-Mar-2021::21:24:04.093209 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
elected=false
=INFO REPORT==== 21-Mar-2021::21:24:04.105420 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
assignments received:
TestTopic:
partition=1 begin_offset=1
=INFO REPORT==== 21-Mar-2021::21:24:04.117680 ===
client client1 connected to kasun-VirtualBox:9092
root@kasun-VirtualBox:~# echo "Hello, World 1" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
root@kasun-VirtualBox:~# echo "Hello, World 2" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
1> 
1> Kafka message {kafka_message,1,<<>>,<<"Hello, World 2">>,create,1616343773113,
[]}
1> 
1> Kafka message {kafka_message,1,<<>>,<<"Hello, World 1">>,create,1616343773113,
[]}

Conclusion

So, as shown above, we can easily create a scalable Kafka consumer in Erlang using brod library. Biggest advantage here is that operations like leader election, partition reassignment etc. are done by Kafka itself, so need to handle those from application side.

Software Engineer |Studied Computer Science & Engineering at University Of Moratuwa, Sri Lanka | Mobile and Web Full Stack Developer | Erlang Developer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store