Thursday, March 21, 2019

ASP.Net Core Streaming Application Using Kafka


What is Apache Kafka
Apache Kafka is a distributed streaming platform. And it provides following three key capabilities:
  1. Publish and subscribe to streams of records
  2. Store streams of record in a fault tolerant way
  3. Let’s application process streams of records as they appear
Kafka runs as a cluster on one or more servers. And in Kafka, records are stored in categories called topics, where each record has a key, a value and a timestamp.
There are two main broad categories of applications where Kafka can be used.
  1. building real-time fault tolerant streaming data pipeline.
  2. Building real-time fault tolerant streaming applications.
Kafka has four core API’s, Producer, Consumer, Streams and Connector. For this post, I will be focusing only on Producer and Consumer. I will be using built in Producer and create .Net Core Consumer. In my next post, I will be creating .Net Core Producer.
Installing Kafka and its dependencies
Kafka has dependency on Java Runtime and Zookeeper. Hence, before I install Kafka, I will have to install JRE8 and Zookeeper. Also Zookeeper and Kafka uses .gz compression. As a result 7zip also needs to be installed to extract these files.
7zip Installation
For installing 7zip, I first download the version “7-Zip for 64-bit Windows x64 (Intel 64 or AMD64)” from the website http://www.7-zip.org/download.html and installed it.
JRE 8 Installation
I downloaded and installed the Windows x64 Offline version from the oracle website http://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html .
After JRE is installed, first of all, I created a system environment variable named “JAVA_HOME” and provided the path of the JRE install. Also I updated the system environment variable Path, and appended “%JAVA_HOME%\bin” in the end. This will enable the use of Java command from any path in command prompt.
Zookeeper Installation
Anter 7zip and JRE is installed, I downloaded the latest version of the Zookeeper from the Zookeeper website http://zookeeper.apache.org/releases.html . After downloading, I extracted the downloaded file into “C:\zookeeper-3.4.10” folder using 7zip.
Furthermore I changed the file “zoo_sample.cfg” inside of the folder “C:\zookeeper-3.4.10\config” to “zoo.cfg”. Since I am not using anything fancy here, therefore I am going to use the default configuration. The only thing I changed in the “zoo.cfg” file is the location of the “dataDir”. I changed it from default value to “/data”.
Finally, I added a new system environment variable “ZOOKEEPER_HOME” with the path to the Zookeeper installation folder “C:\zookeeper-3.4.10”. Also updated the Path system environment variable to append “%ZOOKEEPER_HOME%\bin” to the end.
Installing Kafka
From the Kafka website http://kafka.apache.org/downloads.html I downloaded “Scala 2.11  –kafka_2.11-1.0.0.tgz (asc, sha512)” binary file. After downloading, I extracted the downloaded file into “C:\Kafka”.
In addition to that I edited the “server.properties” file and updated “log.dir” to use “/kafka_logs” instead of the default folder.

--> Running Kafka

1) Start Zookeeper Instance

open up a command prompt

(navigate to “C:\zookeeper-3.4.10\bin”
type command “zkserver” and hit enter)

or 

open up a command prompt (start cmd.exe /k "cd C:\zookeeper\zookeeper-3.4.13\bin & c: & zkserver" )



2) Start Kafka instance

open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\”
type command “.\bin\windows\kafka-server-start.bat ./config/server.properties” and hit enter

or 


Use same opened command prompt
(start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\ & c: & .\bin\windows\kafka-server-start.bat ./config/server.properties")



3) Create KafKa Topic

open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\bin\windows”

Delete if Topic is Already Existed 
kafka-topics.bat -delete -zookeeper localhost:2181 -topic timemanagement_booking

Create Topic

(type command “kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic timemanagement_booking” and hit enter)

or 

Use same opened command prompt
start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\bin\windows & c: & kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic timemanagement_booking"



As a result a new topic named “timemanagement_booking” will be created

Note - In the command, there is one property most noteworthy. The property “replication-factor”, which determines how many nodes the topic will be replicated. Since I have only one instance Kafka installed on my PC. Hence I will use 1 for “replication-factor”.


4) Testing Kafka using inbuilt Producer/Consumer

KafKa Producer
open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\bin\windows”
type command “kafka-console-producer.bat –broker-list localhost:9092 –topic timemanagement_booking” and hit enter

or

(Use same opened command prompt
start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\bin\windows & c: & kafka-console-producer.bat –broker-list localhost:9092 –topic timemanagement_booking")

As a result, a new instance of producer will be started.

KafKa Consumer

open up a command prompt
navigate to “c:\kafka\kafka_2.11-1.0.0\bin\windows”
type command “kafka-console-consumer.bat –zookeeper localhost:2181 –topic timemanagement_booking” and hit enter
As a result, a new instance of consumer will be started.

or

(Use same opened command prompt
start cmd.exe /k "cd c:\kafka\kafka_2.11-1.0.0\bin\windows & c: & kafka-console-consumer.bat –zookeeper localhost:2181 –topic timemanagement_booking")


Creating .Net Core Consumer
First of all I will open the TimeManagement application solution in Visual Studio 2017. After the solution is opened, I am going to right click on the solution and select “Add” -> “New Project“. This will open up the New Project model window. In the New Project model window, I am going to select “.Net Core” -> “Console App (.NET Core)” and give the name of the project “TimeManagement.Streaming.Consumer” and click “OK“.




Adding Nuget Package
After the project is created I will install nuget package for Kafka “Confluent.Kafka”. Hence I will right click on the project and then open the Manage Nuget Packages option. In the Package Manager window I will go to the Browse tab and search for Confluent.Kafka and install it.



IBookingConsumer
After the nuget package is installed, I will create a new interface IBookingConsumer, which will have the contract for listening to the Kafka stream. It will have a single method Listen, which returns void and takes an Action delegate with string as its single input parameter.
C#

public interface IBookingConsumer
{
void Listen(Action<string> message);
}
BookingConsumer
Finally, I will create the implementation class BookingConsumer. And BookingConsumer will be connecting and listening to Kafka stream. Also the BookingConsumer will implement the IBookingConsumer interface.
Furthermore inside BookingConsumer class, I will import the Confluent.Kafka and Confluent.Kafka.Serialization namespaces. Because I need these namespaces for accessing Kafka API.
For the implementation of Listen method, I will first create a configuration object, which is an instance of Dictionary with key as string and value as object. This configuration object will be passed to the constructor of the Consumer class of the Confluent.Kafka assembly.
After creating the Consumer instance, I will Subscribe to the “timemanagement_booking” topic. And attach an callback to the OnMessage event. And when the event occurs, I will callback the caller of the function passing the value from the Kafka stream. Finally I will create a while loop, where I will poll Kafka every 10 millisecond.

using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Text;

namespace TimeManagement.Streaming.Consumer
{
     public class BookingConsumer : IBookingConsumer
     {
         public void Listen(Action<string> message)
         {
             var config = new Dictionary<string, object>
         {
             {"group.id","booking_consumer" },
             {"bootstrap.servers", "localhost:9092" },
             { "enable.auto.commit", "false" }
         };

            using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
             {
                 consumer.Subscribe("timemanagement_booking");
                 consumer.OnMessage += (_, msg) => {
                     message(msg.Value);
                 };

                while (true)
                 {
                     consumer.Poll(100);
                 }
             }
         }
     }
}



Running .Net Core Kafka Consumer
So now that the .Net Core Kafka consumer code is complete, therefore I will implement the Main method of the Program.cs. And inside the Main method I will create a new instance of BookingConsumer. And call the Listen method of the BookingConsumer passing Console.Writeline method. Finally I will set the Timemanagement.Streaming.Consumer as startup project and press Ctrl+F5 to run the application.

class Program
{
static void Main(string[] args)
{
var bookingConsumer = new BookingConsumer();
bookingConsumer.Listen(Console.WriteLine);
}
}


Testing .Net Core Kafka Consumer
Since my .Net Core Kafka consumer is now running, hence I will open the command prompt where the built in Producer is running. Finally I will type a message “Hello booking consumer” and press enter. As a result I will see the message appearing immediately in the .Net Core consumer console.


References
JRE Download: http://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html
7zip Download: http://www.7-zip.org/
Zookeeper Download: https://www.apache.org/dyn/closer.cgi/zookeeper/
Kafka Download: https://kafka.apache.org/downloads
Confluent.Kafka Nuget: https://github.com/confluentinc/confluent-kafka-dotnet/

























































Thursday, March 14, 2019

Azure-Identity-Management - Uses Groups and Roles


Get-AzureADDirectoryRole
$CompanyAdminRole = Get-AzureADDirectoryRole | Where-Object {$_.DisplayName -eq "Company Administrator"}
Get-AzureADDirectoryRoleMember -ObjectId $CompanyAdminRole.ObjectId
Get-AzureADDirectoryRoleTemplate
$SecurityAdminRoleTemplate = Get-AzureADDirectoryRoleTemplate | Where-Object {$_.DisplayName -eq "Security Administrator"}
#Activate the role
$SecurityAdminRole = Enable-AzureADDirectoryRole -RoleTemplateId $SecurityAdminRoleTemplate.ObjectId
$user = Get-AzureADUser -Top 1
Add-AzureADDirectoryRoleMember -RefObjectId $user.ObjectId -ObjectId $SecurityAdminRole.ObjectId

Azure-Identity-Management

#Install the AzureAD Module
Find-Module AzureAD
Install-Module -Name AzureAD
Import-Module -Name AzureAD
#Get credentials to connect
$AzureADCredentials = Get-Credential -Message "Credentials to connect to Azure AD"
#Connect to the Azure AD tenant
Connect-AzureAD -Credential $AzureADCredentials
#Get info about the current session
Get-AzureADCurrentSessionInfo
#Get Information about the tenant and domain
Get-AzureADTenantDetail
Get-AzureADDomain