Migration from Hive aggregates to Spark dataframe
Overview
Hive became very popular sql solution specially with Oracle/SQL Developers on big data platform. Hive is good in batch processing with a map reduce execution engine. Hive can also use Spark or Tez as its execution engine. Spark on the other hand is fast becoming the standard de facto in the Hadoop ecosystem for batch processing. Its a parallel processing framework helping us to create programs that can run on a cluster.
In Q3 2019 our main focus was to convert one of our hive implementation written in 2017 for targeting to Spark Scala for targeting and geo-spatial visualization. The project goal is to refactor the existing HIVE SQL code and use Spark Scala dataframe api. Also consolidate hadoop processing and simplify multi-server file transfer. All the processing logic was in HiveQL as the solution was implemented in Hive. The task in hand is to convert all the sql to dataframe.
Our Goal
the goal was -
1. To migrate the application from MapR datalake to our own datalake(Hortonworks flavored).
2. Consolidate hadoop processing at source and simplify multi-host file transfers. The advantage was to run the processing on the same cluster when we were getting parquet files for processing so now we are generating the data and processing it on the same datalake.
3. This was an opportunity to redesign the old system written in 2017 to new sets of tools and technologies. Hence we redesign the solution using Spark/Scala as processing engine and parquet(on hdfs) as storage.
Team
It wasn't an easy start for us(team of two) as we were completely new team on a new infrastructure. as with everyone - we also need to put more time to understand new tools and technologies and also learning new terms almost everyday fow initial month.About Old System
Old application was written as Hive, Sqoop and shell scripts and the jobs were scheduled by Oozie.
Dataflow
we get a parquet files
-> External Hive table with data from RDMS(Sqoop)
-> Hive Processing
-> export the resultant tables to csv
-> feed the csv to the database
Old dataflow with Hive and Sqoop |
New System
New application was written as Spark(scala), using dropbox(DI) for RDBMS data and scheduled via stream(our propriety scheduling platform similar to Oozie)
New Dataflow
-> input feed via layouts
-> Spark dataframe aggr. to process the data
-> Spark to convert the parquet results to csv
-> send the data to our sftp side to feed it to RDBMS.
input
We used plain csv and converted to parquet via spark to feed into spark aggregates.
It’s also important to do as much work as you can in an external relational database engine so you only import the data needed.
- other input was already with parquet
output
We write the output in parquet and wrote additional processing step in spark to convert parquet to csv.
Processing
One simple example of 100s of SQLs converted for this project below
for HiveQL to
select substr(DIG.GEOCODE, 1,5) as geocode, DIG.topic_id, DIG.DMA, sum(DIG.DMA_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as DMA_SCORE_rollup,
sum(DIG.NATIONAL_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as NATIONAL_SCORE_rollup
from IMP_DIG_interest_DTZ dig, imp_dig_rollup_geos geos
where DIG.GEOCODE = GEOS.GEOCODE
group by geocode, DIG.topic_id, DIG.DMA;
sum(DIG.NATIONAL_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as NATIONAL_SCORE_rollup
from IMP_DIG_interest_DTZ dig, imp_dig_rollup_geos geos
where DIG.GEOCODE = GEOS.GEOCODE
group by geocode, DIG.topic_id, DIG.DMA;
implementation using Spark Dataframe API
Performance tuning
I do want to touch one performance as we faced slowness initially specially while writing csv file from parquet. some of learning are ask follows
- Use parallelism in writing csv
- Optimizing dataframe calls to use map side join
- groupBy cant be avoided in our case so shuffle is expected but the thought is to reduce it as much as possible.
- We did some optimization using the Spark UI. Cache data frames in specific places also help to improve the performance of our jobs.
dataset | size of csv produced |
TIme Taken
| |
---|---|---|---|
before tuning | after tuning | ||
DTZ | ~7GB | 28minutes | 6minutes |
ATZ | ~2.5GB | 12minutes | 5.9minutes |
ZIP | ~2GB | 10minutes | 5minutes |
PCR | ~23GB | 200+minutes( failed due to OOM error) | 11minutes |
after migration, we noticed we reduced the execution time of our batch to more than 50%-70%.
It could be a combination of more hardware and spark vs Hive. Spark and Hive have different hardware needs but our new cluster is huge compare to the old MapR so its a combination of both.
In some cases, we struggled with memory issues with Spark with no apparent solution apart from increasing the memory available in each node of the cluster. it’s supposed to be if the Spark process consumes all the memory it starts to spill to the disc but will continue processing data, but this is something that isn’t always the case, some times it breaks.
We´ve gained a lot by migrating our old Hive aggregations into Spark. As I mentioned earlier Hive is a very robust technology, so your process can take time but they do complete most of the time. In Spark, you need a lot of memory resources to have a similar level of completeness but we got big cluster now!.
Also other than initial hickups - it was pleasure writing application using spark and I thoroughly enjoyed it and now working on another project to convert apache drill implementation to Spark.
Also other than initial hickups - it was pleasure writing application using spark and I thoroughly enjoyed it and now working on another project to convert apache drill implementation to Spark.