Thursday, October 24, 2019

Migrating Hive to Spark




Migration from Hive aggregates to Spark dataframe

Overview




Hive became very popular sql solution specially with Oracle/SQL Developers on big data platform. Hive is good in batch processing with a map reduce execution engine. Hive can also use Spark or Tez as its execution engine. Spark on the other hand is fast becoming the standard de facto in the Hadoop ecosystem for batch processing. Its a parallel processing framework helping us to create programs that can run on a cluster. 



In Q3 2019 our main focus was to convert one of our hive implementation written in 2017 for targeting to Spark Scala for targeting and geo-spatial visualization. The project goal is to refactor the existing HIVE SQL code and use Spark Scala dataframe api. Also consolidate hadoop processing and simplify multi-server file transfer. All the processing logic was in HiveQL as the solution was implemented in Hive. The task in hand is to convert all the sql to dataframe.  


Our Goal



the goal was - 


1. To migrate the application from MapR datalake to our own datalake(Hortonworks flavored).  


2. Consolidate hadoop processing at source and simplify multi-host file transfers. The advantage was to run the processing on the same cluster when we were getting parquet files for processing so now we are generating the data and processing it on the same datalake. 

3. This was an opportunity to redesign the old system written in 2017 to new sets of tools and technologies.  Hence we redesign the solution using Spark/Scala as processing engine and parquet(on hdfs) as storage. 




Team 

It wasn't an easy start for us(team of two) as we were completely new team on a new infrastructure.  as with everyone - we also need to put more time to understand new tools and technologies and also learning new terms almost everyday fow initial month.  

About Old System


Old application was written as Hive, Sqoop and shell scripts and the jobs were scheduled by Oozie. 


Dataflow 

we get a parquet files 
     -> External Hive table with data from RDMS(Sqoop)
          ->  Hive Processing 
            -> export the resultant tables to csv
                   -> feed the csv to the database


Old dataflow with Hive and Sqoop




New System



New application was written as Spark(scala), using dropbox(DI) for RDBMS data and scheduled via stream(our propriety scheduling platform similar to Oozie) 

New Dataflow 

    -> input feed via layouts
        -> Spark dataframe aggr. to process the data
           -> Spark to convert the parquet results to csv
              -> send the data to our sftp side to feed it to RDBMS.





input 


We used plain csv and converted to parquet via spark to feed into spark aggregates. 
It’s also important to do as much work as you can in an external relational database engine so you only import the data needed.
- other input was already with parquet

output


We write the output in parquet and wrote additional processing step in spark to convert parquet to csv. 


Processing

One simple example of 100s of SQLs converted for this project below

for HiveQL to 
select substr(DIG.GEOCODE, 1,5) as geocode, DIG.topic_id, DIG.DMA, sum(DIG.DMA_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as DMA_SCORE_rollup,
sum(DIG.NATIONAL_SCORE *GEOS.HHLD_S)/sum(GEOS.HHLD_S) as NATIONAL_SCORE_rollup
from IMP_DIG_interest_DTZ dig, imp_dig_rollup_geos geos
where DIG.GEOCODE = GEOS.GEOCODE
group by geocode,  DIG.topic_id, DIG.DMA;

implementation using Spark Dataframe API





Performance tuning

I do want to touch one performance as we faced slowness initially specially while writing csv file from parquet. some of learning are ask follows 


  1. Use parallelism in writing csv
  2. Optimizing dataframe calls to use map side join
  3. groupBy cant be avoided in our case so shuffle is expected but the thought is to reduce it as much as possible. 
  4. We did some optimization using the Spark UI. Cache data frames in specific places also help to improve the performance of our jobs.

datasetsize of csv produced
TIme Taken
before tuningafter tuning
DTZ~7GB28minutes6minutes
ATZ~2.5GB12minutes5.9minutes
ZIP~2GB10minutes5minutes
PCR~23GB200+minutes( failed due to OOM error)11minutes



after migration, we noticed we reduced the execution time of our batch to more than 50%-70%. 


It could be a combination of more hardware and spark vs Hive. Spark and Hive have different hardware needs but our new cluster is huge compare to the old MapR so its a combination of both. 



In some cases, we struggled with memory issues with Spark with no apparent solution apart from increasing the memory available in each node of the cluster. it’s supposed to be if the Spark process consumes all the memory it starts to spill to the disc but will continue processing data, but this is something that isn’t always the case, some times it breaks.


We´ve gained a lot by migrating our old Hive aggregations into Spark. As I mentioned earlier Hive is a very robust technology, so your process can take time but they do complete most of the time. In Spark, you need a lot of memory resources to have a similar level of completeness but we got big cluster now!. 

Also other than initial hickups - it was pleasure writing application using spark and I thoroughly enjoyed it and now working on another project to convert apache drill implementation to Spark.


Friday, March 29, 2019

Recommendation engine using PredictionIO- Basics, Challenges


Recommendation - Collaborative Filtering

Collaborative Filtering techniques explore the idea that relationships exists between products and people's interests. As the Netflix Prize competition has demonstrated, matrix factorization models are superior to classic nearest-neighbor techniques for producing product recommendations, allowing the incorporation of additional information such as implicit feedback, temporal effects, and confidence levels.

Netflix vs amazon recommendation - 

One popular example of Collaborative Filtering is Netflix. Everything on their site is driven by their customer's selections, which if made frequently enough, get turned into recommendations. Netflix orders these recommendations in such a way that the highest ranking items are more visible to users, in hopes of getting them to select those recommendations as well

Another popular example is amazon.com Amazon's item recommendation system is based on what you've previously purchased, as well as the frequency with which you've looked at certain books or other items during previous visits to their website. The advantages of using Collaborative Filtering is that users get a broader exposure to many different products they might be interested in. This exposure encourages users towards continual usage or purchase of their product.


I build recommendation engine using PredictionIO.  if you are interested learning more on implementation - You can send me an email and I can responded on how to design events etc.


I will just give pointers here - you can find the code in my github repo - https://github.com/pawan-agnihotri

PredictionIO - Overview

What: Apache PredictionIO® framework for machine learning, machine learning server built on top of apache spark,  spark mllib, hbase
  Apache License, Version 2.0
  Written in Scala, based on Spark and implements Lambda 

Architecture.
    Support Spark MLLib and OpenNLP
  Support batch and real time injection and predictions
    Respond to dynamic queries in real-time via Rest API

 Who/When:
  The company was founded in 2013 and is based in Walnut, California.

  Acquired by Salesforce in Feb 2016 and currently used in Salesforce Einstein(Salesforce AI Initiative)



Product Recommender - Build using PredictionIO
     Build model that produces individualized recommendations and serve at real time.
 User inputs
Like/buy/view events
Prediction query
     Output
Prediction result
Transaction Classifier
     Build model that classify user transaction(att0, att1, att2) into multiple categories(0-low, 1-medium, 2-high, 3-very high).
 User inputs
Events
Prediction query
     Output

Prediction result





Goal: Building machine learning model that can serve in real time

Step 1:  Create model using Spark Mlib
Step 2:  Build the model
Step 3Create test/training data
Step 4:  Train and Deploy the model
Step 5:  Use REST API
  Post Event data to Event Server(in real time)
    Make predictions(in real time)

Step 6:  Incorporating the prediction into your application

Challenges - 

1. One of them is Data Sparsity. Having a Large Dataset will most likely result in a user-item matrix being large and sparse, which may provide a good level of accuracy but also pose a risk to speed In comparison, having a small dataset would result in faster speeds but lower accuracy.

2.  Cold Start
Another issue to keep in mind is something called 'cold start'. This is where new users do not have a sufficient amount of ratings to give an accurate recommendation.

3. Scalability - volume increase cause delay

4. Synonyms
The term, 'Synonyms' refers to the frequency of items that are similar, but are labeled differently.
And thus treated differently by the recommendation system. An Example of this would be 'Backpack' vs 'Knapsack'.

5. Gray Sheep
The term 'Gray Sheep' refers to the users that have opinions that don't necessarily 'fit' or are alike to any specific grouping. These users do not consistently agree or disagree on products or items, therefore making recommendations a non-beneficiary to them.

6. Shilling Attacks
However, Shilling Attacks are the abuse of this system by rating certain products high and other products low regardless of personal opinion. Therefore allowing that product to be recommended more often.

7. Long Tail effect - popular items are rated/viewed frequently. This creates a cycle where new items are just a shadow behind the popular items resulting.


It is common in many real-world use cases to only have access to implicit feedback (e.g. views, clicks, purchases, likes, shares etc.). The approach used in spark.mllib to deal with such data is taken from Collaborative Filtering for Implicit Feedback Datasets. Essentially, instead of trying to model the matrix of ratings directly, this approach treats the data as numbers representing the strength in observations of user actions (such as the number of clicks, or the cumulative duration someone spent viewing a movie). Those numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.
 RANk - Its purely a characteristic of the data. As you said the rank refers the presumed latent or hidden factors. For example, if you were measuring how much different people liked movies and tried to cross-predict them then you might have three fields: person, movie, number of stars. Now, lets say that you were omniscient and you knew the absolute truth and you knew that in fact all the movie ratings could be perfectly predicted by just 3 hidden factors, sex, age and income. In that case the "rank" of your run should be 3.
Of course, you don't know how many underlying factors, if any, drive your data so you have to guess. The more you use, the better the results up to a point, but the more memory and computation time you will need.
One way to work it is to start with a rank of 5-10, then increase it, say 5 at a time until your results stop improving. That way you determine the best rank for your dataset by experimentation.


spark.mllib uses the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in spark.mllib has the following parameters:
  • numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
  • rank is the number of latent factors in the model.
  • iterations is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less.
  • lambda specifies the regularization parameter in ALS.
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.


 MatrixFactorizationModel(rank, userFeatures, productFeatures)
 {
      "name": "als",
      "params": {
        "rank": 10,
        "numIterations": 20,
        "lambda": 0.01,
        "seed": 3
      }


val implicitPrefs = false
    val als = new ALS()
    als.setUserBlocks(-1)
    als.setProductBlocks(-1)
    als.setRank(ap.rank)
    als.setIterations(ap.numIterations)
    als.setLambda(ap.lambda)
    als.setImplicitPrefs(implicitPrefs)
    als.setAlpha(1.0)
    als.setSeed(seed)
    als.setCheckpointInterval(10)
    val m = als.run(mllibRatings)

Advantage: 

1. Hierarchical matrix co-clustering / factorization(yes)
2. Preference versus intention
Distinguish between liking and interested in seeing /purchasing
Worthless to recommend an item a user already bought 
3. Scalability 
4. Relevant objectives 
Predicting actual rating may be useless! Missing at random assumption

drawback of our model

1. Multiple individuals using the same account — individual preference
2. Cold start (new users) 

Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...