Sunday, August 20, 2017

Mapr 5.2

Async HBase (1.7.0)
 Metrics (5.2.1)
 Drill (1.10)
 Oozie (4.3.0)
 Drill-YARN (1.10)
 Pig (0.16)
 Flume (1.7.0)
 Sentry (1.7.0)
 HBase/MapR-DB Common (1.1)
 Spark (2.1.0)
 HBase (1.1)
 Spark Standalone (2.1.0)
 Hive (2.1)
 Sqoop (1.4.6)
 Hive Metastore (2.1)
 Sqoop2 (2.0.0)
 HTTPFS (1.0)
 Storm (0.10.0)
 Hue (3.12.0)
 Streams Tools (2.0.1)
 Hue Livy (3.12.0)
 Streams Clients (0.9.0)
 Impala (2.7.0)
 Tez (0.8)
 Mahout (0.12)
 YARN + MapReduce (5.2.1)
 Classic MapReduce (5.2.1) 

Wednesday, August 16, 2017

Kafka Object Serialization and DeSerialization

Serialization is the process of converting an object into a stream of bytes 

Deserialization is the process of converting stream of bytes  to an object.

Remember Kafka stores message as array of bytes 

To create a serializer class, we need to implement below interface 
org.apache.kafka.common.serialization.Serializer  


public interface Serializer extends Closeable {

//startup with configuration
  void configure(Map<String, ?> var1, boolean var2);
  byte[] serialize(String var1, T var2);
//method is called when the Kafka session is to be closed 
 void close();
}

To create a deserializer class, we need to implement below interface  org.apache.kafka.common.serialization.Deserializer 


public interface Deserializer extends Closeable {
//startup with configuration.
  void configure(Map<String, ?> var1, boolean var2);
  T deserialize(String var1, byte[] var2);
//method is called when the Kafka session is to be closed
  void close();

}


Its preety simple to implement so lets directly start with example - here I am creating 
1. UserMessage class, 
2. writing its serializer 
3. deserializer
4. using serializer in producer to convert object to byte stream and sending it to kafka
5. using deserializer in comsumer toget the object back from byte stream
 
-- UserMesasge.java-----
package com.valassis.serialization;

public class UserMessage {

 private String key;
   private String value;
   public UserMessage() {
   }
   public String getKey() {
  return key;
 }
 public void setKey(String key) {
  this.key = key;
 }
 public String getValue() {
  return value;
 }
 public void setValue(String value) {
  this.value = value;
 }
 public UserMessage(String key, String value) {
     this.key = key;
     this.value = value;
   }
   @Override public String toString() {
     return "UserMessage(" + key + "=>" + value + ")";
   }
}

---UserMessageSerializer.java
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;

public class UserMessageSerializer implements Serializer<UserMessage> {

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, UserMessage data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}


}
---UserMessageDeserializer.java
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;

public class UserMessageDeserializer implements Deserializer<UserMessage> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}

@Override
public UserMessage deserialize(String topic, byte[] data) {
   ObjectMapper mapper = new ObjectMapper();
   UserMessage userMessage = null;
   try {
    userMessage = mapper.readValue(data, UserMessage.class);
   } catch (Exception e) {
     e.printStackTrace();
   }
   return userMessage;


}

@Override
public void close() {
// TODO Auto-generated method stub
}
}


----ZoneDataSerializerProducer.java 

package com.valassis.msg.producer;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;
import com.valassis.msg.generation.ZoneDataMessageCreationSerializerLocalFS;
import com.valassis.serialization.UserMessage;

public class ZoneDataSerializerProducer {
 public static void main(String[] args) {

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");

  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer",
    "com.valassis.serialization.UserMessageSerializer");
  props.put("request.required.acks", "1");
  props.put("client.id", "test");
  Producer<String, UserMessage> producer = new KafkaProducer<String, UserMessage>(
    props);
  ZoneDataMessageCreationSerializerLocalFS messages = new ZoneDataMessageCreationSerializerLocalFS();
  Map<String, UserMessage> messageMap = messages.getZoneData();
  System.out.println(messageMap.size());
  messageMap.forEach((key, value) -> {
     
   producer.send(new ProducerRecord<String, UserMessage>("testtopic", key , value));

  });
  producer.close();
 }

}

--- SparkStreamingDeserializerConsumer .java

package com.valassis.msg.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.valassis.serialization.UserMessage;
import com.valassis.serialization.UserMessageDeserializer;

public class SparkStreamingDeserializerConsumer {

static Connection connection;

public static void main(String[] args) throws InterruptedException,
IOException {

SparkConf sparkConf = new SparkConf()
.setAppName("Pawan-JavaKafkaMessageConsumer");

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", UserMessageDeserializer.class);
kafkaParams.put("group.id", "1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("testtopic");

// Create the context with 20 second interval batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(20000));

Configuration conf = HBaseConfiguration.create();
/*
* /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
* Failed to get region location
* org.apache.hadoop.hbase.TableNotFoundException:
* /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
* ConnectionManager$HConnectionImplementation
* .locateRegionInMeta(ConnectionManager.java:1324)
*/
conf.set("hbase.zookeeper.quorum", "localhost:5181");
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.zookeeper.property.clientPort", "5181");
conf.set("hbase.rootdir", "maprfs:///hbase");

conf.set("username", "mapr");
conf.set("password", "mapr");
System.out.println("getting connection");
connection = ConnectionFactory.createConnection(conf);
System.out.println("connection populated");
table = connection.getTable(TABLE_NAME);

JavaInputDStream<ConsumerRecord<String, UserMessage>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, UserMessage> Subscribe(topics,
kafkaParams));

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).foreachRDD(System.out::println);

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).print();
stream.map(record -> print(record.key(), record.value())).print();
;

jssc.start();
table.close();
// jssc.close();
jssc.awaitTermination();
connection.close();

}

// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final byte[] CF = Bytes.toBytes("cf1");

private static final TableName TABLE_NAME = TableName.valueOf("test_log");
private static final byte[] CF = Bytes.toBytes("name");
private static Table table;

// Storing it to hbase
private static Object print(String key, UserMessage value) throws IOException,
InterruptedException {

// System.out.println("inside print");
// System.out.println("key ->" + key);
// System.out.println("value ->" + value);
// Table table = connection.getTable(TABLE_NAME);
// describe the data we want to write.
Put p = new Put(Bytes.toBytes(key));
p.addColumn(CF, Bytes.toBytes("messagekey"), Bytes.toBytes(value.getKey()));
p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value.getValue()));
table.put(p);
return key;
}

}

Sunday, August 13, 2017

Must Know Technologies for Hadoop developers


Having knowledge on Java is an essential skill to program in Hadoop - Hadoop is an open-source, Java-based programming framework that supports the processing of large data sets in a distributed computing environment. Based on Google’s MapReduce model, Hadoop distributes computing jobs and then combines results. The MapReduce scripts used here are written in Java.

you should also be comfortable with below Hadoop frameworks tools. 




Data injection Storage and It's API - File system/NoSQL DB
     Analytical Processing 
Programing SQL on Hadoop
Apache Flume
Apache Sqoop
Apache Kafka
Apache NiFi
Apache HDFS
Apache HBase(column family)
MongoDB(Document)
Redis DataBase(Key-Value)
Apache MapReduce
Apache Pig
Apache Spark
Apache Tez
Apache Hive
Apache Drill
Cloudera Impala
Apache Phoenix
Kylin




You should also have good understanding of  Service Programming frameworks (Apache Thrift, Apache Zookeeper), Serialization tools(Apache Avro) , Scheduling and Workflow tools (Apache Oozie), Security framework (Apache Sentry) and System deployment tools like Apache Ambari, Cloudera HUE, MAPR Admin UI





Monday, August 7, 2017

Kafka and Spark Streaming

Apache Kafka is a high-throughput distributed messaging system for 
·         handling high volume real time feeds (event, data)
·         low latency delivery
·         fault tolerance
·         <store data as bytes>
·         Can support thousands of topics and hundreds of brokers and process billions of messages per day.
So it’s another messaging system. What’s the difference?
1.    Scalable, No size limit in kafka as its distributed (traditional messaging system has limitation on  RAM, Disk and cant handle big data)
2.    No single point of failure
3.    Millions of message per seconds
4.    Free - Apache 2.0 License
 History -
Originated at Linkedin, Open sourced in 2011, founded Confluent in 2014(US startup)
Implemented in Scala (90%) and Java
Kafka + (Spark Steaming, Storm, Custom apps) for processing 
https://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming



Core Concepts
Topic -  stream of records. They are partitioned and replicated (multi-subscriber)

Kafka will elect “leader” broker for each partitions

Partitions – logic distribution of topic at disk level. Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

  • ·         Helps in Scaling as one topic can have multiple partitions.
  • ·         Helps in parallelism
Offset - Messages contained in the partitions are assigned a unique ID number that is called the offset. The role of the offset is to uniquely identify every message within the partition.

Replica - Each partition is replicated across a configurable number of servers for fault tolerance. Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader.

Broker/Server – Responsible for persisting messages to append only Log files Producer and consumer interact with broker to read/write message. Broker can contains multiple partitions of the topic.
·         What happen when broker goes down? Replication to rescue - Partitions are replicated and having 1 leader and others as follower.  All read and write happens at  leader.

Producers - message producers. Async or Sync

Consumers – message consumers. They are grouped by consumer group to read unique message.

ZooKeeper - The ZooKeeper is also responsible for configuration management, leader detection, detecting if any node leaves or joins the cluster, synchronization, etc.


Few Command line options
cd /dfs/kafka_2.11-0.11.0.0
1. start zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
2. start server/broker
nohup bin/kafka-server-start.sh config/server.properties &


3. check list of topics
bin/kafka-topics.sh --list --zookeeper localhost:2181

4.start console producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

5. start console consumer


bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic --from-beginning

6.To delete manually:
Shutdown the cluster
Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
Restart the cluster

Case Study –

Writing data to Kafka -  Java client -
package com.valassis.msg.producer;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;

public class ZOneDataProducer {
       public static void main(String[] args) {

              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092");

              props.put("key.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");
              props.put("serializer.class", "kafka.serializer.StringEncoder");
              props.put("request.required.acks", "1");
              props.put("client.id", "test");
              Producer<String, String> producer = new KafkaProducer<String, String>(
                           props);
              ZoneDataMessageCreationLocalFS messages = new ZoneDataMessageCreationLocalFS();
              Map<String, String> messageMap = messages.getZoneData();
              System.out.println(messageMap.size());
              messageMap.forEach((key, value) -> {
                     producer.send(new ProducerRecord<String, String>("testtopic", key,
                                  value));

              });
              producer.close();
       }

}



Spark Steaming Consumer code
package com.valassis.msg.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

public class SparkStreamingConsumer {

       static Connection connection;

       public static void main(String[] args) throws InterruptedException,
                     IOException {

              SparkConf sparkConf = new SparkConf()
                           .setAppName("Pawan-JavaKafkaMessageConsumer");

              Map<String, Object> kafkaParams = new HashMap<>();
              kafkaParams.put("bootstrap.servers", "localhost:9092");
              kafkaParams.put("key.deserializer", StringDeserializer.class);
              kafkaParams.put("value.deserializer", StringDeserializer.class);
              kafkaParams.put("group.id", "1");
              kafkaParams.put("auto.offset.reset", "latest");
              kafkaParams.put("enable.auto.commit", false);

              Collection<String> topics = Arrays.asList("testtopic");

              // Create the context with 5 minutes batch size
              JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                           new Duration(300000));

              Configuration conf = HBaseConfiguration.create();
              /*
               * /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
               * Failed to get region location
               * org.apache.hadoop.hbase.TableNotFoundException:
               * /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
               * ConnectionManager$HConnectionImplementation
               * .locateRegionInMeta(ConnectionManager.java:1324)
               */
              conf.set("hbase.zookeeper.quorum", "localhost:5181");
              conf.set("hbase.master", "localhost:60000");
              conf.set("hbase.zookeeper.property.clientPort", "5181");
              conf.set("hbase.rootdir", "maprfs:///hbase");

              conf.set("username", "mapr");
              conf.set("password", "mapr");
              System.out.println("getting connection");
              connection = ConnectionFactory.createConnection(conf);
              System.out.println("connection populated");
              table = connection.getTable(TABLE_NAME);

              JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                           .createDirectStream(jssc,
                                         LocationStrategies.PreferConsistent(),
                                         ConsumerStrategies.<String, String> Subscribe(topics,
                                                       kafkaParams));

              // stream.mapToPair(record -> new Tuple2<>(record.key(),
              // record.value())).foreachRDD(System.out::println);

              // stream.mapToPair(record -> new Tuple2<>(record.key(),
              // record.value())).print();
              stream.map(record -> print(record.key(), record.value())).print();
              ;

              jssc.start();
              table.close();
              // jssc.close();
              jssc.awaitTermination();
              connection.close();

       }

       // private static final TableName TABLE_NAME =
       // TableName.valueOf("/user/mapr/zonedatastream");
       // private static final TableName TABLE_NAME =
       // TableName.valueOf("/user/mapr/zonedatastream");
       // private static final byte[] CF = Bytes.toBytes("cf1");

       private static final TableName TABLE_NAME = TableName.valueOf("test_log");
       private static final byte[] CF = Bytes.toBytes("name");
       private static Table table;

       // Storing it to hbase
       private static Object print(String key, String value) throws IOException,
                     InterruptedException {

              // System.out.println("inside print");
              // System.out.println("key ->" + key);
              // System.out.println("value ->" + value);
              // Table table = connection.getTable(TABLE_NAME);
              // describe the data we want to write.
              Put p = new Put(Bytes.toBytes(key));
              p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value));
              table.put(p);
              return key;
       }

}



Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...