Thursday, October 24, 2019

Migrating Hive to Spark




Migration from Hive aggregates to Spark dataframe

Overview




Hive became very popular sql solution specially with Oracle/SQL Developers on big data platform. Hive is good in batch processing with a map reduce execution engine. Hive can also use Spark or Tez as its execution engine. Spark on the other hand is fast becoming the standard de facto in the Hadoop ecosystem for batch processing. Its a parallel processing framework helping us to create programs that can run on a cluster. 



In Q3 2019 our main focus was to convert one of our hive implementation written in 2017 for targeting to Spark Scala for targeting and geo-spatial visualization. The project goal is to refactor the existing HIVE SQL code and use Spark Scala dataframe api. Also consolidate hadoop processing and simplify multi-server file transfer. All the processing logic was in HiveQL as the solution was implemented in Hive. The task in hand is to convert all the sql to dataframe.  


Our Goal



the goal was - 


1. To migrate the application from MapR datalake to our own datalake(Hortonworks flavored).  


2. Consolidate hadoop processing at source and simplify multi-host file transfers. The advantage was to run the processing on the same cluster when we were getting parquet files for processing so now we are generating the data and processing it on the same datalake. 

3. This was an opportunity to redesign the old system written in 2017 to new sets of tools and technologies.  Hence we redesign the solution using Spark/Scala as processing engine and parquet(on hdfs) as storage. 




Team 

It wasn't an easy start for us(team of two) as we were completely new team on a new infrastructure.  as with everyone - we also need to put more time to understand new tools and technologies and also learning new terms almost everyday fow initial month.  

About Old System


Old application was written as Hive, Sqoop and shell scripts and the jobs were scheduled by Oozie. 


Dataflow 

we get a parquet files 
     -> External Hive table with data from RDMS(Sqoop)
          ->  Hive Processing 
            -> export the resultant tables to csv
                   -> feed the csv to the database


Old dataflow with Hive and Sqoop




New System



New application was written as Spark(scala), using dropbox(DI) for RDBMS data and scheduled via stream(our propriety scheduling platform similar to Oozie) 

New Dataflow 

    -> input feed via layouts
        -> Spark dataframe aggr. to process the data
           -> Spark to convert the parquet results to csv
              -> send the data to our sftp side to feed it to RDBMS.





input 


We used plain csv and converted to parquet via spark to feed into spark aggregates. 
It’s also important to do as much work as you can in an external relational database engine so you only import the data needed.
- other input was already with parquet

output


We write the output in parquet and wrote additional processing step in spark to convert parquet to csv. 


Processing

One simple example of 100s of SQLs converted for this project below

for HiveQL to 
select substr(DIG.GEOCODE, 1,5) as geocode, DIG.topic_id, DIG.DMA, sum(DIG.DMA_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as DMA_SCORE_rollup,
sum(DIG.NATIONAL_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as NATIONAL_SCORE_rollup
from IMP_DIG_interest_DTZ dig, imp_dig_rollup_geos geos
where DIG.GEOCODE = GEOS.GEOCODE
group by geocode,  DIG.topic_id, DIG.DMA;

implementation using Spark Dataframe API





Performance tuning

I do want to touch one performance as we faced slowness initially specially while writing csv file from parquet. some of learning are ask follows 


  1. Use parallelism in writing csv
  2. Optimizing dataframe calls to use map side join
  3. groupBy cant be avoided in our case so shuffle is expected but the thought is to reduce it as much as possible. 
  4. We did some optimization using the Spark UI. Cache data frames in specific places also help to improve the performance of our jobs.

datasetsize of csv produced
TIme Taken
before tuningafter tuning
DTZ~7GB28minutes6minutes
ATZ~2.5GB12minutes5.9minutes
ZIP~2GB10minutes5minutes
PCR~23GB200+minutes( failed due to OOM error)11minutes



after migration, we noticed we reduced the execution time of our batch to more than 50%-70%. 


It could be a combination of more hardware and spark vs Hive. Spark and Hive have different hardware needs but our new cluster is huge compare to the old MapR so its a combination of both. 



In some cases, we struggled with memory issues with Spark with no apparent solution apart from increasing the memory available in each node of the cluster. it’s supposed to be if the Spark process consumes all the memory it starts to spill to the disc but will continue processing data, but this is something that isn’t always the case, some times it breaks.


We´ve gained a lot by migrating our old Hive aggregations into Spark. As I mentioned earlier Hive is a very robust technology, so your process can take time but they do complete most of the time. In Spark, you need a lot of memory resources to have a similar level of completeness but we got big cluster now!. 

Also other than initial hickups - it was pleasure writing application using spark and I thoroughly enjoyed it and now working on another project to convert apache drill implementation to Spark.


Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...