What is Apache Kafka
Apache Kafka is a distributed streaming platform. And it provides following three key capabilities:- Publish and subscribe to streams of records
- Store streams of record in a fault tolerant way
- Let’s application process streams of records as they appear
There are two main broad categories of applications where Kafka can be used.
- building real-time fault tolerant streaming data pipeline.
- Building real-time fault tolerant streaming applications.
Installing Kafka and its dependencies
Kafka has dependency on Java Runtime and Zookeeper. Hence, before I install Kafka, I will have to install JRE8 and Zookeeper. Also Zookeeper and Kafka uses .gz compression. As a result 7zip also needs to be installed to extract these files.7zip Installation
For installing 7zip, I first download the version “7-Zip for 64-bit Windows x64 (Intel 64 or AMD64)” from the website http://www.7-zip.org/download.html and installed it.JRE 8 Installation
I downloaded and installed the Windows x64 Offline version from the oracle website http://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html .After JRE is installed, first of all, I created a system environment variable named “JAVA_HOME” and provided the path of the JRE install. Also I updated the system environment variable Path, and appended “%JAVA_HOME%\bin” in the end. This will enable the use of Java command from any path in command prompt.
Zookeeper Installation
Anter 7zip and JRE is installed, I downloaded the latest version of the Zookeeper from the Zookeeper website http://zookeeper.apache.org/releases.html . After downloading, I extracted the downloaded file into “C:\zookeeper-3.4.10” folder using 7zip.Furthermore I changed the file “zoo_sample.cfg” inside of the folder “C:\zookeeper-3.4.10\config” to “zoo.cfg”. Since I am not using anything fancy here, therefore I am going to use the default configuration. The only thing I changed in the “zoo.cfg” file is the location of the “dataDir”. I changed it from default value to “/data”.
Finally, I added a new system environment variable “ZOOKEEPER_HOME” with the path to the Zookeeper installation folder “C:\zookeeper-3.4.10”. Also updated the Path system environment variable to append “%ZOOKEEPER_HOME%\bin” to the end.
Installing Kafka
From the Kafka website http://kafka.apache.org/downloads.html I downloaded “Scala 2.11 –kafka_2.11-1.0.0.tgz (asc, sha512)” binary file. After downloading, I extracted the downloaded file into “C:\Kafka”.In addition to that I edited the “server.properties” file and updated “log.dir” to use “/kafka_logs” instead of the default folder.
--> Running Kafka
1) Start Zookeeper Instance
open up a command prompt
(navigate to “C:\zookeeper-3.4.10\bin”
type command “zkserver” and hit enter)
or
open up a command prompt (start cmd.exe /k "cd C:\zookeeper\zookeeper-3.4.13\bin & c: & zkserver" )
2) Start Kafka instance
open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\”
type command “.\bin\windows\kafka-server-start.bat ./config/server.properties” and hit enter
or
Use same opened command prompt
(start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\ & c: & .\bin\windows\kafka-server-start.bat ./config/server.properties")
3) Create KafKa Topic
open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\bin\windows”
Delete if Topic is Already Existed
kafka-topics.bat -delete -zookeeper localhost:2181 -topic timemanagement_booking
Create Topic
(type command “kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic timemanagement_booking” and hit enter)
or
Use same opened command prompt
start cmd.exe /k
"cd c:\kafka\kafka_2.11-1.0.0\bin\windows & c: &
kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1
–partitions 1 –topic timemanagement_booking"
As a result a new topic named “timemanagement_booking” will be created
Note - In the command, there is one property most noteworthy. The property “replication-factor”, which determines how many nodes the topic will be replicated. Since I have only one instance Kafka installed on my PC. Hence I will use 1 for “replication-factor”.
4) Testing Kafka using inbuilt Producer/Consumer
KafKa Producer
open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\bin\windows”
type command “kafka-console-producer.bat –broker-list localhost:9092 –topic timemanagement_booking” and hit enter
or
(Use same opened command prompt
start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\bin\windows & c: & kafka-console-producer.bat –broker-list localhost:9092 –topic timemanagement_booking")
As a result, a new instance of producer will be started.
KafKa Consumer
open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\bin\windows”
type command “kafka-console-consumer.bat –zookeeper localhost:2181 –topic timemanagement_booking” and hit enter
As a result, a new instance of consumer will be started.
or
(Use same opened command prompt
start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\bin\windows & c: & kafka-console-consumer.bat –zookeeper localhost:2181 –topic timemanagement_booking")
Creating .Net Core Consumer
First of all I will open theTimeManagement
application solution in Visual Studio 2017. After the solution is opened, I am going to right click on the solution and select “Add” -> “New Project“. This will open up the New Project model window. In the New Project model window, I am going to select “.Net Core” -> “Console App (.NET Core)” and give the name of the project “TimeManagement.Streaming.Consumer
” and click “OK“.Adding Nuget Package
After the project is created I will install nuget package for Kafka “Confluent.Kafka”. Hence I will right click on the project and then open the Manage Nuget Packages option. In the Package Manager window I will go to the Browse tab and search for Confluent.Kafka and install it.IBookingConsumer
After the nuget package is installed, I will create a new interfaceIBookingConsumer
, which will have the contract for listening to the Kafka stream. It will have a single method Listen
, which returns void
and takes an Action
delegate with string
as its single input parameter.C#
public interface IBookingConsumer
{
void Listen(Action<string> message);
}
BookingConsumer
Finally, I will create the implementation classBookingConsumer
. And BookingConsumer
will be connecting and listening to Kafka stream. Also the BookingConsumer
will implement the IBookingConsumer
interface.Furthermore inside
BookingConsumer
class, I will import the Confluent.Kafka
and Confluent.Kafka.Serialization
namespaces. Because I need these namespaces for accessing Kafka API.For the implementation of
Listen
method, I will first create a configuration object, which is an instance of Dictionary
with key as string
and value as object
. This configuration object will be passed to the constructor of the Consumer
class of the Confluent.Kafka
assembly.After creating the
Consumer
instance, I will Subscribe
to the “timemanagement_booking” topic. And attach an callback to the OnMessage
event. And when the event occurs, I will callback the caller of the function passing the value from the Kafka stream. Finally I will create a while
loop, where I will poll Kafka every 10 millisecond.using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
namespace TimeManagement.Streaming.Consumer
{
public class BookingConsumer : IBookingConsumer
{
public void Listen(Action<string> message)
{
var config = new Dictionary<string, object>
{
{"group.id","booking_consumer" },
{"bootstrap.servers", "localhost:9092" },
{ "enable.auto.commit", "false" }
};
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe("timemanagement_booking");
consumer.OnMessage += (_, msg) => {
message(msg.Value);
};
while (true)
{
consumer.Poll(100);
}
}
}
}
}
Running .Net Core Kafka Consumer
So now that the .Net Core Kafka consumer code is complete, therefore I will implement theMain
method of the Program.cs
. And inside the Main
method I will create a new instance of BookingConsumer
. And call the Listen
method of the BookingConsumer
passing Console.Writeline
method. Finally I will set the Timemanagement.Streaming.Consumer
as startup project and press Ctrl+F5 to run the application.class Program
{
static void Main(string[] args)
{
var bookingConsumer = new BookingConsumer();
bookingConsumer.Listen(Console.WriteLine);
}
}
Testing .Net Core Kafka Consumer
Since my .Net Core Kafka consumer is now running, hence I will open the command prompt where the built in Producer is running. Finally I will type a message “Hello booking consumer” and press enter. As a result I will see the message appearing immediately in the .Net Core consumer console.References
JRE Download: http://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html7zip Download: http://www.7-zip.org/
Zookeeper Download: https://www.apache.org/dyn/closer.cgi/zookeeper/
Kafka Download: https://kafka.apache.org/downloads
Confluent.Kafka Nuget: https://github.com/confluentinc/confluent-kafka-dotnet/