Erlang -how to develop a scalable Kafka consumer

Kasun Edward
6 min readMar 21, 2021

Apache Kafka is a Java based, open-source event streaming platform which is used to design scalable , high-throughput , fault -tolerant systems to handle real-time data feeds. If you are unfamiliar with Kafka, you can get a complete idea by referring the official kafka guide

Scaling the consumer with Consumer Group concept

Scaling the consumer app is really simple with the Kafka Consumer Group concept. Load balancing the data with multiple consumers is done by Kafka itself, so nothing to worry from application side. Let’s see how it happens.

In Kafka, data is stored in topics. You can create topics with several partitions. Once a producer writes data to a Kafka topic, data is stored in one of the partitions in round-robin manner. ( Or the application can select specific partition). If there is one consumer, it can read all messages from all the partitions in that topic. See below diagram for a better understanding.

One consumer in consumer group

When adding more consumers to consumer group, partitions will be re-assigned to consumers in order to share the load, as shown in below diagrams.

Two consumers in consumer group
Three consumers in consumer group

What will happen if another consumer is added to consumer group ?. Each partition is connected to one partition only. So, First three consumers will be connected to three partitions and remaining consumer will be idle, as shown in below diagram.

Four consumers in consumer group

So, when designing a solution we need to consider the number of partitions in a topic, since the scalability of the consumer app depends on that.

So let’s now move to the Erlang- Kafka consumer developing part.

Prerequisites

You need to setup Erlang ,Kafka and Rebar3 in your developing environment. I’m not going describe installation steps in detail since it will vary depend on your environment. I’ll mention official installation guides for your reference.

Code Development

Creating the Project and Adding Erlang-Kafka library

You can create the Erlang project using rebar3 create release command.

$ rebar3 new release demoApp1
===> Writing apps/demoApp1/src/demoApp1_app.erl
===> Writing apps/demoApp1/src/demoApp1_sup.erl
===> Writing apps/demoApp1/src/demoApp1.app.src
===> Writing rebar.config
===> Writing config/sys.config
===> Writing config/vm.args
===> Writing .gitignore
===> Writing LICENSE
===> Writing README.md

After that we will add Brod — Apache Kafka Client for Erlang to the project. It is an Erlang implementation of Apache kafka protocol. It supports for both producers and consumers. We can add relevant configurations to rebar.config and demoApp1.app.src files.

rebar.config
demoApp1.app.src

Adding a Kafka Client to the app

brod/Kafka library has a special implementation called brod_client. It is a gen_server which has the primary responsibility of establish tcp sockets to kafka brokers and maintain those connections. Once a client is started, it can be used in anywhere in the application for handling data with producers and consumers. See below code for the client implementation.

kafka_client_svr.erl

Now we can use client1 inside the application to handle producers and consumers.

Consumer logic development

Need to mention two main points in consumer logic development

  • in init() method , call brod:start_link_group_subscriber function. You have to use the kafka client (client1) which we added earlier as a parameter. Also another important parameter is GroupId ; this is the unique id shared by all the consumers in a consumer group. It is used by Kafka side to identify that all the consumers all belongs to same group.
  • Add a callback function to process the received kafka message. Once we connect to kafka topic in init function, brod_group_subscriber will listen to that topic continuously and once message is arrived it will call callback function with that data.

Please refer the below implementation.

kafka_consumer_svr.erl

Adding client and consumer servers to supervisor

As the final step of the code development , let’s add both client and consumer servers to application supervisor.

demoApp1_sup.erl

You can find the complete code in below link.

Testing

Here comes the most awaiting and interesting part. Lets check how scalability is handled in our solution.

Create Kafka topic

First lets create our Kafka topic ; TestTopic. For testing scalability of consumer let’s create it with two partitions.

root@kasun-VirtualBox:~# /home/kafka/bin/kafka-topics.sh --create --topic TestTopic  --replication-factor 1   --partitions 2  --zookeeper localhost:2181
Created topic TestTopic.

Check with one consumer

Let’s start one consumer and check how data written to the TestTopic is read by that consumer.

Go to demoApp1 location and start the application using rebar3 shell command as bellow.

root@kasun-VirtualBox:/home/kasun/demoApp1# rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling demoApp1
Erlang/OTP 22 [erts-10.6.4] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1]
Eshell V10.6.4 (abort with ^G)
1> Starting Kafka Client ok
===> Booted snappyer
===> Booted crc32cer
===> Booted kafka_protocol
===> Booted supervisor3
===> Booted brod
===> Booted demoApp1
===> Booted sasl
=INFO REPORT==== 21-Mar-2021::20:59:29.039196 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
elected=true
=INFO REPORT==== 21-Mar-2021::20:59:29.170970 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
assignments received:
TestTopic:
partition=0 begin_offset=undefined
partition=1 begin_offset=undefined
=INFO REPORT==== 21-Mar-2021::20:59:29.177380 ===
client client1 connected to kasun-VirtualBox:9092

Now write to the topic as bellow.

root@kasun-VirtualBox:~# echo "Hello, World" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
root@kasun-VirtualBox:~#

When we check demoApp1 shell, we can see that data has been read by the consumer.

1> Kafka message {kafka_message,0,<<>>,<<"Hello, World 1">>,create,1616344333665,
[]}
Kafka message {kafka_message,1,<<>>,<<"Hello, World 2">>,create,1616344336740,
[]}

Check with two consumers

Now let’s copy demoApp1 code to another location; demoApp2 and start it. As you can see once after starting demoApp2, partition reassignment has been happened.

Now:

  • demoApp1 is listening to: partition 0
  • demoApp2 is listening to: partition 1

demoApp1

=INFO REPORT==== 21-Mar-2021::21:24:04.086674 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=1):
re-joining group, reason:rebalance_in_progress
=INFO REPORT==== 21-Mar-2021::21:24:04.091575 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
elected=true
=INFO REPORT==== 21-Mar-2021::21:24:04.104967 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
assignments received:
TestTopic:
partition=0 begin_offset=1

demoApp2

=INFO REPORT==== 21-Mar-2021::21:24:04.093209 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
elected=false
=INFO REPORT==== 21-Mar-2021::21:24:04.105420 ===
Group member (test_group_id,coor=<0.213.0>,cb=<0.212.0>,generation=2):
assignments received:
TestTopic:
partition=1 begin_offset=1
=INFO REPORT==== 21-Mar-2021::21:24:04.117680 ===
client client1 connected to kasun-VirtualBox:9092

Now let’s write some data to kafka topic and check each consumer output.

root@kasun-VirtualBox:~# echo "Hello, World 1" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
root@kasun-VirtualBox:~# echo "Hello, World 2" | /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null

When checking demoApp1 and demoApp2 shell outputs we can see that data has been distributed to two partitions and has been read by each consumer assigned to the related partition.

demoApp1

1> 
1> Kafka message {kafka_message,1,<<>>,<<"Hello, World 2">>,create,1616343773113,
[]}

demoApp2

1> 
1> Kafka message {kafka_message,1,<<>>,<<"Hello, World 1">>,create,1616343773113,
[]}

Conclusion

So, as shown above, we can easily create a scalable Kafka consumer in Erlang using brod library. Biggest advantage here is that operations like leader election, partition reassignment etc. are done by Kafka itself, so need to handle those from application side.

So , I hope you may get some basic understanding about how to develop a scalable Kafka consumer in Erlang. Waiting for your valuable comments regarding the article.

Thanks for the reading and happy coding 😍

--

--

Kasun Edward

Software Engineer |Studied Computer Science & Engineering at University Of Moratuwa, Sri Lanka | Mobile and Web Full Stack Developer | Erlang Developer