Showing posts with label Mapr. Show all posts
Showing posts with label Mapr. Show all posts

Thursday, October 24, 2019

Migrating Hive to Spark




Migration from Hive aggregates to Spark dataframe

Overview




Hive became very popular sql solution specially with Oracle/SQL Developers on big data platform. Hive is good in batch processing with a map reduce execution engine. Hive can also use Spark or Tez as its execution engine. Spark on the other hand is fast becoming the standard de facto in the Hadoop ecosystem for batch processing. Its a parallel processing framework helping us to create programs that can run on a cluster. 



In Q3 2019 our main focus was to convert one of our hive implementation written in 2017 for targeting to Spark Scala for targeting and geo-spatial visualization. The project goal is to refactor the existing HIVE SQL code and use Spark Scala dataframe api. Also consolidate hadoop processing and simplify multi-server file transfer. All the processing logic was in HiveQL as the solution was implemented in Hive. The task in hand is to convert all the sql to dataframe.  


Our Goal



the goal was - 


1. To migrate the application from MapR datalake to our own datalake(Hortonworks flavored).  


2. Consolidate hadoop processing at source and simplify multi-host file transfers. The advantage was to run the processing on the same cluster when we were getting parquet files for processing so now we are generating the data and processing it on the same datalake. 

3. This was an opportunity to redesign the old system written in 2017 to new sets of tools and technologies.  Hence we redesign the solution using Spark/Scala as processing engine and parquet(on hdfs) as storage. 




Team 

It wasn't an easy start for us(team of two) as we were completely new team on a new infrastructure.  as with everyone - we also need to put more time to understand new tools and technologies and also learning new terms almost everyday fow initial month.  

About Old System


Old application was written as Hive, Sqoop and shell scripts and the jobs were scheduled by Oozie. 


Dataflow 

we get a parquet files 
     -> External Hive table with data from RDMS(Sqoop)
          ->  Hive Processing 
            -> export the resultant tables to csv
                   -> feed the csv to the database


Old dataflow with Hive and Sqoop




New System



New application was written as Spark(scala), using dropbox(DI) for RDBMS data and scheduled via stream(our propriety scheduling platform similar to Oozie) 

New Dataflow 

    -> input feed via layouts
        -> Spark dataframe aggr. to process the data
           -> Spark to convert the parquet results to csv
              -> send the data to our sftp side to feed it to RDBMS.





input 


We used plain csv and converted to parquet via spark to feed into spark aggregates. 
It’s also important to do as much work as you can in an external relational database engine so you only import the data needed.
- other input was already with parquet

output


We write the output in parquet and wrote additional processing step in spark to convert parquet to csv. 


Processing

One simple example of 100s of SQLs converted for this project below

for HiveQL to 
select substr(DIG.GEOCODE, 1,5) as geocode, DIG.topic_id, DIG.DMA, sum(DIG.DMA_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as DMA_SCORE_rollup,
sum(DIG.NATIONAL_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as NATIONAL_SCORE_rollup
from IMP_DIG_interest_DTZ dig, imp_dig_rollup_geos geos
where DIG.GEOCODE = GEOS.GEOCODE
group by geocode,  DIG.topic_id, DIG.DMA;

implementation using Spark Dataframe API





Performance tuning

I do want to touch one performance as we faced slowness initially specially while writing csv file from parquet. some of learning are ask follows 


  1. Use parallelism in writing csv
  2. Optimizing dataframe calls to use map side join
  3. groupBy cant be avoided in our case so shuffle is expected but the thought is to reduce it as much as possible. 
  4. We did some optimization using the Spark UI. Cache data frames in specific places also help to improve the performance of our jobs.

datasetsize of csv produced
TIme Taken
before tuningafter tuning
DTZ~7GB28minutes6minutes
ATZ~2.5GB12minutes5.9minutes
ZIP~2GB10minutes5minutes
PCR~23GB200+minutes( failed due to OOM error)11minutes



after migration, we noticed we reduced the execution time of our batch to more than 50%-70%. 


It could be a combination of more hardware and spark vs Hive. Spark and Hive have different hardware needs but our new cluster is huge compare to the old MapR so its a combination of both. 



In some cases, we struggled with memory issues with Spark with no apparent solution apart from increasing the memory available in each node of the cluster. it’s supposed to be if the Spark process consumes all the memory it starts to spill to the disc but will continue processing data, but this is something that isn’t always the case, some times it breaks.


We´ve gained a lot by migrating our old Hive aggregations into Spark. As I mentioned earlier Hive is a very robust technology, so your process can take time but they do complete most of the time. In Spark, you need a lot of memory resources to have a similar level of completeness but we got big cluster now!. 

Also other than initial hickups - it was pleasure writing application using spark and I thoroughly enjoyed it and now working on another project to convert apache drill implementation to Spark.


Thursday, September 14, 2017

importing relational database table to hbase table - Sqoop

Importing relational database table to hbase table - Sqoop is the best tool for it.

Its a two step process

1. create hbase table

2. import the data from relational table to hbase table using sqoop import

  • hbase  - create table in hbase  

[mapr@hostname lib]$ hbase shell

hbase(main):002:0> create 'DV_PRODUCTION_BRANCHES', 'DETAILS'

  • Sqoop - copy oracle driver and load the data into hbase table using below command.

1. copy oracle driver to lib - /opt/mapr/sqoop/sqoop-1.4.6/lib/

2. run sqoop import
sqoop import --connect jdbc:oracle:thin:@LOCALHOST:1521/DVTST --username DV_PRD --password dv_tst \
--table DV_PRODUCTION_BRANCHES  \
--columns "PRODUCTION_BRANCH_ID,NAME" \
--hbase-table 'DV_PRODUCTION_BRANCHES' \
--column-family DETAILS \
--hbase-row-key PRODUCTION_BRANCH_ID \
-m 1

Points to be noted - 
  • DV_PRODUCTION_BRANCHES is the relational table in DVTST database 
  • DV_PRODUCTION_BRANCHES is name of the hbase table with column family DETAILS and rowkey as PRODUCTION_BRANCH_ID 
  • Sqoop doesn’t permit multiple column families at the time of when this blog was written. You need to create HBase table first with multiple column family and then execute three Sqoop import operations to import each column family. 

Wednesday, August 16, 2017

Kafka Object Serialization and DeSerialization

Serialization is the process of converting an object into a stream of bytes 

Deserialization is the process of converting stream of bytes  to an object.

Remember Kafka stores message as array of bytes 

To create a serializer class, we need to implement below interface 
org.apache.kafka.common.serialization.Serializer  


public interface Serializer extends Closeable {

//startup with configuration
  void configure(Map<String, ?> var1, boolean var2);
  byte[] serialize(String var1, T var2);
//method is called when the Kafka session is to be closed 
 void close();
}

To create a deserializer class, we need to implement below interface  org.apache.kafka.common.serialization.Deserializer 


public interface Deserializer extends Closeable {
//startup with configuration.
  void configure(Map<String, ?> var1, boolean var2);
  T deserialize(String var1, byte[] var2);
//method is called when the Kafka session is to be closed
  void close();

}


Its preety simple to implement so lets directly start with example - here I am creating 
1. UserMessage class, 
2. writing its serializer 
3. deserializer
4. using serializer in producer to convert object to byte stream and sending it to kafka
5. using deserializer in comsumer toget the object back from byte stream
 
-- UserMesasge.java-----
package com.valassis.serialization;

public class UserMessage {

 private String key;
   private String value;
   public UserMessage() {
   }
   public String getKey() {
  return key;
 }
 public void setKey(String key) {
  this.key = key;
 }
 public String getValue() {
  return value;
 }
 public void setValue(String value) {
  this.value = value;
 }
 public UserMessage(String key, String value) {
     this.key = key;
     this.value = value;
   }
   @Override public String toString() {
     return "UserMessage(" + key + "=>" + value + ")";
   }
}

---UserMessageSerializer.java
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;

public class UserMessageSerializer implements Serializer<UserMessage> {

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, UserMessage data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}


}
---UserMessageDeserializer.java
package com.valassis.serialization;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;

public class UserMessageDeserializer implements Deserializer<UserMessage> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}

@Override
public UserMessage deserialize(String topic, byte[] data) {
   ObjectMapper mapper = new ObjectMapper();
   UserMessage userMessage = null;
   try {
    userMessage = mapper.readValue(data, UserMessage.class);
   } catch (Exception e) {
     e.printStackTrace();
   }
   return userMessage;


}

@Override
public void close() {
// TODO Auto-generated method stub
}
}


----ZoneDataSerializerProducer.java 

package com.valassis.msg.producer;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;
import com.valassis.msg.generation.ZoneDataMessageCreationSerializerLocalFS;
import com.valassis.serialization.UserMessage;

public class ZoneDataSerializerProducer {
 public static void main(String[] args) {

  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");

  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer",
    "com.valassis.serialization.UserMessageSerializer");
  props.put("request.required.acks", "1");
  props.put("client.id", "test");
  Producer<String, UserMessage> producer = new KafkaProducer<String, UserMessage>(
    props);
  ZoneDataMessageCreationSerializerLocalFS messages = new ZoneDataMessageCreationSerializerLocalFS();
  Map<String, UserMessage> messageMap = messages.getZoneData();
  System.out.println(messageMap.size());
  messageMap.forEach((key, value) -> {
     
   producer.send(new ProducerRecord<String, UserMessage>("testtopic", key , value));

  });
  producer.close();
 }

}

--- SparkStreamingDeserializerConsumer .java

package com.valassis.msg.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.valassis.serialization.UserMessage;
import com.valassis.serialization.UserMessageDeserializer;

public class SparkStreamingDeserializerConsumer {

static Connection connection;

public static void main(String[] args) throws InterruptedException,
IOException {

SparkConf sparkConf = new SparkConf()
.setAppName("Pawan-JavaKafkaMessageConsumer");

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", UserMessageDeserializer.class);
kafkaParams.put("group.id", "1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("testtopic");

// Create the context with 20 second interval batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(20000));

Configuration conf = HBaseConfiguration.create();
/*
* /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
* Failed to get region location
* org.apache.hadoop.hbase.TableNotFoundException:
* /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
* ConnectionManager$HConnectionImplementation
* .locateRegionInMeta(ConnectionManager.java:1324)
*/
conf.set("hbase.zookeeper.quorum", "localhost:5181");
conf.set("hbase.master", "localhost:60000");
conf.set("hbase.zookeeper.property.clientPort", "5181");
conf.set("hbase.rootdir", "maprfs:///hbase");

conf.set("username", "mapr");
conf.set("password", "mapr");
System.out.println("getting connection");
connection = ConnectionFactory.createConnection(conf);
System.out.println("connection populated");
table = connection.getTable(TABLE_NAME);

JavaInputDStream<ConsumerRecord<String, UserMessage>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, UserMessage> Subscribe(topics,
kafkaParams));

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).foreachRDD(System.out::println);

// stream.mapToPair(record -> new Tuple2<>(record.key(),
// record.value())).print();
stream.map(record -> print(record.key(), record.value())).print();
;

jssc.start();
table.close();
// jssc.close();
jssc.awaitTermination();
connection.close();

}

// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final TableName TABLE_NAME =
// TableName.valueOf("/user/mapr/zonedatastream");
// private static final byte[] CF = Bytes.toBytes("cf1");

private static final TableName TABLE_NAME = TableName.valueOf("test_log");
private static final byte[] CF = Bytes.toBytes("name");
private static Table table;

// Storing it to hbase
private static Object print(String key, UserMessage value) throws IOException,
InterruptedException {

// System.out.println("inside print");
// System.out.println("key ->" + key);
// System.out.println("value ->" + value);
// Table table = connection.getTable(TABLE_NAME);
// describe the data we want to write.
Put p = new Put(Bytes.toBytes(key));
p.addColumn(CF, Bytes.toBytes("messagekey"), Bytes.toBytes(value.getKey()));
p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value.getValue()));
table.put(p);
return key;
}

}

Monday, August 7, 2017

Kafka and Spark Streaming

Apache Kafka is a high-throughput distributed messaging system for 
·         handling high volume real time feeds (event, data)
·         low latency delivery
·         fault tolerance
·         <store data as bytes>
·         Can support thousands of topics and hundreds of brokers and process billions of messages per day.
So it’s another messaging system. What’s the difference?
1.    Scalable, No size limit in kafka as its distributed (traditional messaging system has limitation on  RAM, Disk and cant handle big data)
2.    No single point of failure
3.    Millions of message per seconds
4.    Free - Apache 2.0 License
 History -
Originated at Linkedin, Open sourced in 2011, founded Confluent in 2014(US startup)
Implemented in Scala (90%) and Java
Kafka + (Spark Steaming, Storm, Custom apps) for processing 
https://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming



Core Concepts
Topic -  stream of records. They are partitioned and replicated (multi-subscriber)

Kafka will elect “leader” broker for each partitions

Partitions – logic distribution of topic at disk level. Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

  • ·         Helps in Scaling as one topic can have multiple partitions.
  • ·         Helps in parallelism
Offset - Messages contained in the partitions are assigned a unique ID number that is called the offset. The role of the offset is to uniquely identify every message within the partition.

Replica - Each partition is replicated across a configurable number of servers for fault tolerance. Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader.

Broker/Server – Responsible for persisting messages to append only Log files Producer and consumer interact with broker to read/write message. Broker can contains multiple partitions of the topic.
·         What happen when broker goes down? Replication to rescue - Partitions are replicated and having 1 leader and others as follower.  All read and write happens at  leader.

Producers - message producers. Async or Sync

Consumers – message consumers. They are grouped by consumer group to read unique message.

ZooKeeper - The ZooKeeper is also responsible for configuration management, leader detection, detecting if any node leaves or joins the cluster, synchronization, etc.


Few Command line options
cd /dfs/kafka_2.11-0.11.0.0
1. start zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
2. start server/broker
nohup bin/kafka-server-start.sh config/server.properties &


3. check list of topics
bin/kafka-topics.sh --list --zookeeper localhost:2181

4.start console producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

5. start console consumer


bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic --from-beginning

6.To delete manually:
Shutdown the cluster
Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
Restart the cluster

Case Study –

Writing data to Kafka -  Java client -
package com.valassis.msg.producer;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.valassis.msg.generation.ZoneDataMessageCreationLocalFS;

public class ZOneDataProducer {
       public static void main(String[] args) {

              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092");

              props.put("key.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");
              props.put("serializer.class", "kafka.serializer.StringEncoder");
              props.put("request.required.acks", "1");
              props.put("client.id", "test");
              Producer<String, String> producer = new KafkaProducer<String, String>(
                           props);
              ZoneDataMessageCreationLocalFS messages = new ZoneDataMessageCreationLocalFS();
              Map<String, String> messageMap = messages.getZoneData();
              System.out.println(messageMap.size());
              messageMap.forEach((key, value) -> {
                     producer.send(new ProducerRecord<String, String>("testtopic", key,
                                  value));

              });
              producer.close();
       }

}



Spark Steaming Consumer code
package com.valassis.msg.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

public class SparkStreamingConsumer {

       static Connection connection;

       public static void main(String[] args) throws InterruptedException,
                     IOException {

              SparkConf sparkConf = new SparkConf()
                           .setAppName("Pawan-JavaKafkaMessageConsumer");

              Map<String, Object> kafkaParams = new HashMap<>();
              kafkaParams.put("bootstrap.servers", "localhost:9092");
              kafkaParams.put("key.deserializer", StringDeserializer.class);
              kafkaParams.put("value.deserializer", StringDeserializer.class);
              kafkaParams.put("group.id", "1");
              kafkaParams.put("auto.offset.reset", "latest");
              kafkaParams.put("enable.auto.commit", false);

              Collection<String> topics = Arrays.asList("testtopic");

              // Create the context with 5 minutes batch size
              JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                           new Duration(300000));

              Configuration conf = HBaseConfiguration.create();
              /*
               * /user/mapr/zonedatastream 17/07/31 16:36:41 ERROR AsyncProcess:
               * Failed to get region location
               * org.apache.hadoop.hbase.TableNotFoundException:
               * /user/mapr/zonedatastream at org.apache.hadoop.hbase.client.
               * ConnectionManager$HConnectionImplementation
               * .locateRegionInMeta(ConnectionManager.java:1324)
               */
              conf.set("hbase.zookeeper.quorum", "localhost:5181");
              conf.set("hbase.master", "localhost:60000");
              conf.set("hbase.zookeeper.property.clientPort", "5181");
              conf.set("hbase.rootdir", "maprfs:///hbase");

              conf.set("username", "mapr");
              conf.set("password", "mapr");
              System.out.println("getting connection");
              connection = ConnectionFactory.createConnection(conf);
              System.out.println("connection populated");
              table = connection.getTable(TABLE_NAME);

              JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                           .createDirectStream(jssc,
                                         LocationStrategies.PreferConsistent(),
                                         ConsumerStrategies.<String, String> Subscribe(topics,
                                                       kafkaParams));

              // stream.mapToPair(record -> new Tuple2<>(record.key(),
              // record.value())).foreachRDD(System.out::println);

              // stream.mapToPair(record -> new Tuple2<>(record.key(),
              // record.value())).print();
              stream.map(record -> print(record.key(), record.value())).print();
              ;

              jssc.start();
              table.close();
              // jssc.close();
              jssc.awaitTermination();
              connection.close();

       }

       // private static final TableName TABLE_NAME =
       // TableName.valueOf("/user/mapr/zonedatastream");
       // private static final TableName TABLE_NAME =
       // TableName.valueOf("/user/mapr/zonedatastream");
       // private static final byte[] CF = Bytes.toBytes("cf1");

       private static final TableName TABLE_NAME = TableName.valueOf("test_log");
       private static final byte[] CF = Bytes.toBytes("name");
       private static Table table;

       // Storing it to hbase
       private static Object print(String key, String value) throws IOException,
                     InterruptedException {

              // System.out.println("inside print");
              // System.out.println("key ->" + key);
              // System.out.println("value ->" + value);
              // Table table = connection.getTable(TABLE_NAME);
              // describe the data we want to write.
              Put p = new Put(Bytes.toBytes(key));
              p.addColumn(CF, Bytes.toBytes("value"), Bytes.toBytes(value));
              table.put(p);
              return key;
       }

}



Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...