Apache Kafka is a high-throughput distributed messaging system for
·
handling high volume real time feeds (event, data)
·
low
latency delivery
·
fault
tolerance
· <store data
as bytes>
·
Can
support thousands of topics and hundreds of brokers and process billions of
messages per day.
So it’s another
messaging system. What’s the difference?
1.
Scalable, No
size limit in kafka as its distributed (traditional messaging system has
limitation on RAM, Disk and cant handle
big data)
2.
No single
point of failure
3.
Millions
of message per seconds
4.
Free -
Apache 2.0 License
History -
Originated at Linkedin,
Open sourced in 2011, founded Confluent in 2014(US startup)
Implemented in Scala
(90%) and Java
Core Concepts
Topic - stream
of records. They are partitioned and replicated (multi-subscriber)
Kafka will elect
“leader” broker for each partitions
Partitions – logic distribution of topic at disk
level. Each partition is an ordered, immutable sequence of records that is
continually appended to—a structured commit log. The records in the partitions
are each assigned a sequential id number called the offset that uniquely
identifies each record within the partition.
- ·
Helps
in Scaling as one topic can have multiple partitions.
- ·
Helps
in parallelism
Offset - Messages contained in the partitions are assigned a unique ID
number that is called the offset. The role of the offset is to uniquely
identify every message within the partition.
Replica - Each partition is replicated across a
configurable number of servers for fault tolerance. Each partition has one
server which acts as the "leader" and zero or more servers which act
as "followers". The leader handles all read and write requests for
the partition while the followers passively replicate the leader.
Broker/Server – Responsible for persisting messages to
append only Log files Producer and consumer interact with broker to read/write
message. Broker can contains multiple partitions of the topic.
·
What happen
when broker goes down? Replication
to rescue - Partitions are replicated and having 1 leader and others as
follower. All read and write happens
at leader.
Producers - message producers. Async or Sync
Consumers – message consumers. They are grouped by
consumer group to read unique message.
ZooKeeper - The ZooKeeper is also responsible for
configuration management, leader detection, detecting if any node leaves or
joins the cluster, synchronization, etc.
Few Command line options
cd /dfs/kafka_2.11-0.11.0.0
1. start zookeeper
nohup bin/zookeeper-server-start.sh
config/zookeeper.properties &
2. start server/broker
nohup bin/kafka-server-start.sh config/server.properties
&
3. check list of topics
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.start console producer
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic testtopic
5. start console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic --from-beginning
6.To delete manually:
Shutdown the cluster
Clean kafka log dir (specified by the log.dir attribute in
kafka config file ) as well the zookeeper data
Restart the cluster
Case Study –
Writing data to Kafka -
Java client -
package
com.valassis.msg.producer;
import java.util.Map;
import
java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import
org.apache.kafka.clients.producer.Producer;
import
org.apache.kafka.clients.producer.ProducerRecord;
import
com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;
public class ZOneDataProducer {
public static void main(String[] args) {
Properties
props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("client.id", "test");
Producer<String,
String> producer = new KafkaProducer<String, String>(
props);
ZoneDataMessageCreationLocalFS
messages = new
ZoneDataMessageCreationLocalFS();
Map<String,
String> messageMap = messages.getZoneData();
System.out.println(messageMap.size());
messageMap.forEach((key, value) -> {
producer.send(new
ProducerRecord<String, String>("testtopic", key,
value));
});
producer.close();
}
}
Spark Steaming Consumer code
package
com.valassis.msg.consumer;
import java.io.IOException;
import java.util.Arrays;
import
java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import
org.apache.hadoop.hbase.client.ConnectionFactory;
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.client.Table;
import
org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import
org.apache.kafka.common.serialization.StringDeserializer;
import
org.apache.spark.SparkConf;
import
org.apache.spark.streaming.Duration;
import
org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import
org.apache.spark.streaming.kafka010.ConsumerStrategies;
import
org.apache.spark.streaming.kafka010.KafkaUtils;
import
org.apache.spark.streaming.kafka010.LocationStrategies;
public class SparkStreamingConsumer
{
static Connection connection;
public static void main(String[] args) throws
InterruptedException,
IOException
{
SparkConf
sparkConf = new SparkConf()
.setAppName("Pawan-JavaKafkaMessageConsumer");
Map<String,
Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String>
topics = Arrays.asList("testtopic");
// Create the
context with 5 minutes batch size
JavaStreamingContext
jssc = new JavaStreamingContext(sparkConf,
new Duration(300000));
Configuration
conf =
HBaseConfiguration.create();
/*
* /user/mapr/zonedatastream
17/07/31 16:36:41 ERROR AsyncProcess:
* Failed to get region location
* org.apache.hadoop.hbase.TableNotFoundException:
* /user/mapr/zonedatastream at
org.apache.hadoop.hbase.client.
* ConnectionManager$HConnectionImplementation
*
.locateRegionInMeta(ConnectionManager.java:1324)
*/
conf.set("hbase.zookeeper.quorum", "localhost:5181");
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.zookeeper.property.clientPort", "5181");
conf.set("hbase.rootdir", "maprfs:///hbase");
conf.set("username", "mapr");
conf.set("password", "mapr");
System.out.println("getting
connection");
connection = ConnectionFactory.createConnection(conf);
System.out.println("connection
populated");
table = connection.getTable(TABLE_NAME);
JavaInputDStream<ConsumerRecord<String,
String>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,
String> Subscribe(topics,
kafkaParams));
//
stream.mapToPair(record -> new Tuple2<>(record.key(),
//
record.value())).foreachRDD(System.out::println);
//
stream.mapToPair(record -> new Tuple2<>(record.key(),
//
record.value())).print();
stream.map(record -> print(record.key(), record.value())).print();
;
jssc.start();
table.close();
// jssc.close();
jssc.awaitTermination();
connection.close();
}
// private static
final TableName TABLE_NAME =
//
TableName.valueOf("/user/mapr/zonedatastream");
// private static
final TableName TABLE_NAME =
//
TableName.valueOf("/user/mapr/zonedatastream");
// private static
final byte[] CF = Bytes.toBytes("cf1");
private static final TableName TABLE_NAME = TableName.valueOf("test_log");
private static final byte[] CF = Bytes.toBytes("name");
private static Table table;
// Storing it to hbase
private static Object print(String key, String value) throws IOException,
InterruptedException
{
//
System.out.println("inside print");
//
System.out.println("key ->" + key);
// System.out.println("value
->" + value);
// Table table =
connection.getTable(TABLE_NAME);
// describe the
data we want to write.
Put
p = new Put(Bytes.toBytes(key));
p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value));
table.put(p);
return key;
}
}