Monday, August 7, 2017

Kafka and Spark Streaming

Apache Kafka is a high-throughput distributed messaging system for 
·         handling high volume real time feeds (event, data)
·         low latency delivery
·         fault tolerance
·         <store data as bytes>
·         Can support thousands of topics and hundreds of brokers and process billions of messages per day.
So it’s another messaging system. What’s the difference?
1.    Scalable, No size limit in kafka as its distributed (traditional messaging system has limitation on  RAM, Disk and cant handle big data)
2.    No single point of failure
3.    Millions of message per seconds
4.    Free - Apache 2.0 License
 History -
Originated at Linkedin, Open sourced in 2011, founded Confluent in 2014(US startup)
Implemented in Scala (90%) and Java
Kafka + (Spark Steaming, Storm, Custom apps) for processing 
https://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming



Core Concepts
Topic -  stream of records. They are partitioned and replicated (multi-subscriber)

Kafka will elect “leader” broker for each partitions

Partitions – logic distribution of topic at disk level. Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

  • ·         Helps in Scaling as one topic can have multiple partitions.
  • ·         Helps in parallelism
Offset - Messages contained in the partitions are assigned a unique ID number that is called the offset. The role of the offset is to uniquely identify every message within the partition.

Replica - Each partition is replicated across a configurable number of servers for fault tolerance. Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader.

Broker/Server – Responsible for persisting messages to append only Log files Producer and consumer interact with broker to read/write message. Broker can contains multiple partitions of the topic.
·         What happen when broker goes down? Replication to rescue - Partitions are replicated and having 1 leader and others as follower.  All read and write happens at  leader.

Producers - message producers. Async or Sync

Consumers – message consumers. They are grouped by consumer group to read unique message.

ZooKeeper - The ZooKeeper is also responsible for configuration management, leader detection, detecting if any node leaves or joins the cluster, synchronization, etc.


Few Command line options
cd /dfs/kafka_2.11-0.11.0.0
1. start zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
2. start server/broker
nohup bin/kafka-server-start.sh config/server.properties &


3. check list of topics
bin/kafka-topics.sh --list --zookeeper localhost:2181

4.start console producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

5. start console consumer


bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic --from-beginning

6.To delete manually:
Shutdown the cluster
Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
Restart the cluster

Case Study –

Writing data to Kafka -  Java client -
package com.valassis.msg.producer;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;

public class ZOneDataProducer {
       public static void main(String[] args) {

              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092");

              props.put("key.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");
              props.put("serializer.class", "kafka.serializer.StringEncoder");
              props.put("request.required.acks", "1");
              props.put("client.id", "test");
              Producer<String, String> producer = new KafkaProducer<String, String>(
                           props);
              ZoneDataMessageCreationLocalFS messages = new ZoneDataMessageCreationLocalFS();
              Map<String, String> messageMap = messages.getZoneData();
              System.out.println(messageMap.size());
              messageMap.forEach((key, value) -> {
                     producer.send(new ProducerRecord<String, String>("testtopic", key,
                                  value));

              });
              producer.close();
       }

}



Spark Steaming Consumer code
package com.valassis.msg.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

public class SparkStreamingConsumer {

       static Connection connection;

       public static void main(String[] args) throws InterruptedException,
                     IOException {

              SparkConf sparkConf = new SparkConf()
                           .setAppName("Pawan-JavaKafkaMessageConsumer");

              Map<String, Object> kafkaParams = new HashMap<>();
              kafkaParams.put("bootstrap.servers", "localhost:9092");
              kafkaParams.put("key.deserializer", StringDeserializer.class);
              kafkaParams.put("value.deserializer", StringDeserializer.class);
              kafkaParams.put("group.id", "1");
              kafkaParams.put("auto.offset.reset", "latest");
              kafkaParams.put("enable.auto.commit", false);

              Collection<String> topics = Arrays.asList("testtopic");

              // Create the context with 5 minutes batch size
              JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                           new Duration(300000));

              Configuration conf = HBaseConfiguration.create();
              /*
               * /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
               * Failed to get region location
               * org.apache.hadoop.hbase.TableNotFoundException:
               * /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
               * ConnectionManager$HConnectionImplementation
               * .locateRegionInMeta(ConnectionManager.java:1324)
               */
              conf.set("hbase.zookeeper.quorum", "localhost:5181");
              conf.set("hbase.master", "localhost:60000");
              conf.set("hbase.zookeeper.property.clientPort", "5181");
              conf.set("hbase.rootdir", "maprfs:///hbase");

              conf.set("username", "mapr");
              conf.set("password", "mapr");
              System.out.println("getting connection");
              connection = ConnectionFactory.createConnection(conf);
              System.out.println("connection populated");
              table = connection.getTable(TABLE_NAME);

              JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                           .createDirectStream(jssc,
                                         LocationStrategies.PreferConsistent(),
                                         ConsumerStrategies.<String, String> Subscribe(topics,
                                                       kafkaParams));

              // stream.mapToPair(record -> new Tuple2<>(record.key(),
              // record.value())).foreachRDD(System.out::println);

              // stream.mapToPair(record -> new Tuple2<>(record.key(),
              // record.value())).print();
              stream.map(record -> print(record.key(), record.value())).print();
              ;

              jssc.start();
              table.close();
              // jssc.close();
              jssc.awaitTermination();
              connection.close();

       }

       // private static final TableName TABLE_NAME =
       // TableName.valueOf("/user/mapr/zonedatastream");
       // private static final TableName TABLE_NAME =
       // TableName.valueOf("/user/mapr/zonedatastream");
       // private static final byte[] CF = Bytes.toBytes("cf1");

       private static final TableName TABLE_NAME = TableName.valueOf("test_log");
       private static final byte[] CF = Bytes.toBytes("name");
       private static Table table;

       // Storing it to hbase
       private static Object print(String key, String value) throws IOException,
                     InterruptedException {

              // System.out.println("inside print");
              // System.out.println("key ->" + key);
              // System.out.println("value ->" + value);
              // Table table = connection.getTable(TABLE_NAME);
              // describe the data we want to write.
              Put p = new Put(Bytes.toBytes(key));
              p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value));
              table.put(p);
              return key;
       }

}



1 comment:

  1. As per my opinion, videos play a vital role in learning. And when you consider AWS big data consultant , then you should focus on all the learning methods. Udacity seems to be an excellent place to explore machine learning.

    ReplyDelete

Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...