Tuesday, January 23, 2018

Spark Dataframes



Concept of dataframes  -  A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.
So, a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query. 
                                                       or
A DataFrame is equivalent to a table in RDBMS and can also be manipulated in similar ways to the "native" distributed collections in RDDs. Unlike RDDs , Dataframes keep track of the schema and support various relational operations that lead to more optimized execution. 
Each DatafRame object represents a logical plan but because of their "lazy" nature no execution occurs until the user calls a specific "output operation".

                                                       
Dataframes are distributed across worker. It is the primary abstraction in Spark SQL and immutable. 

You can think of Dataframes as special type of RDD.  Dataframes keep track of the schema and support various relational operations that lead to more optimized execution.   An RDD, on the other hand, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it are not as constrained.

However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF method


In general it is recommended to use a DataFrame where possible due to the built in query optimization.



You can easliy switch from dataframes to RDD (1) or RDD to dataframes(2)

(1) val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")

(2)val sample_DF = sampleRDD.toDF()



The main disadvantage to RDDs is that they don’t perform particularly well whenever Spark needs to distribute the data within the cluster, or write the data to disk, it does so using Java serialization by default (although it is possible to use Kryo as a faster alternative in most cases). The overhead of serializing individual Java and Scala objects is expensive and requires sending both data and structure between nodes (each serialized object contains the class structure as well as the values). 

There is also the overhead of garbage collection that results from creating and destroying individual objects.

Spark 1.3 introduced a new DataFrame API which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization. 

There are also advantages when performing computations in a single process as Spark can serialize the data into off-heap storage in a binary format and then perform many transformations directly on this off-heap memory, avoiding the garbage-collection costs associated with constructing individual objects for each row in the data set. Because Spark understands the schema, there is no need to use Java serialization to encode the data

There are key data management concepts that you need to know.
  • The first is that a data model is a collection of concepts for describing data.
  • The second is that a schema is a description of a particular collection of data that uses a given data model
Relational database model is widely used model wherein Relation is a table with rows and columns and each relation has a schema defining fields in columns.

Instance - actual data at given time

#ofRows - Cardinality
#ofColumns - degree
  • Each row is a dataframe is a Row object.
row = Row(name="Pawan", age=30)
and can be accessed via
row.name or row['name']

  • Creating a dataframe from Python list


1. You create a data frame from a data source, from disk or from a Python object.
data = [('pawan',30),('sapan',26)]
df = sqlContext.createDataFrame(data)
 or assign the name of the columns(schema)
df = sqlContext.createDataFrame(data, ['name','age'])

2. You apply transformations to that data frame, like select and filter.
df2 = df.select(df.name, df.age)

or
df2 = df.filter(df.age > 28)

3. Cache some dataframe for reuse 
df.cache() //save it in memory and reuse it else it will read from the disk every time

4.  then you apply actions to the data frame,like show and count.
df2.collect(); //never use collect in production application - it will crash the driver machine(Out of memory) if it cant hold the data. Use show() or take() instead.

Point to note 

  • When you're dealing with group data, count is a transformation, but when you're dealing with data frames, count is an action.
  • Transformers runs at executors and Actions runs at executors and driver. 
  • use UnionAll() function to combine the dataframes. 
  • Cache if dataframes are reused.
  • never use collect in production

When you use DataFrames or Spark SQL, you are building up a query plan. Each transformation you apply to a DataFrame adds some information to the query plan. When you finally call an action, which triggers execution of your Spark job, several things happen:
  1. Spark's Catalyst optimizer analyzes the query plan (called an unoptimized logical query plan) and attempts to optimize it. Optimizations include (but aren't limited to) rearranging and combining filter() operations for efficiency, converting Decimal operations to more efficient long integer operations, and pushing some operations down into the data source (e.g., a filter() operation might be translated to a SQL WHERE clause, if the data source is a traditional SQL RDBMS). The result of this optimization phase is an optimized logical plan.
  2. Once Catalyst has an optimized logical plan, it then constructs multiple physical plans from it. Specifically, it implements the query in terms of lower level Spark RDD operations.
  3. Catalyst chooses which physical plan to use via cost optimization. That is, it determines which physical plan is the most efficient (or least expensive), and uses that one.
  4. Finally, once the physical RDD execution plan is established, Spark actually executes the job.
You can examine the query plan using the explain() function on a DataFrame. By default, explain() only shows you the final physical plan; however, if you pass it an argument of True, it will show you all phases.



Friday, January 5, 2018

MemSQL - database for real-time analytics


MemSQL


MemSQL is relational, distributed, in memory SQL database platform for real-time analytics.



MemSQL is a proprietary closed-source distributed relational/sql database with both free community and paid enterprise edition

MemSQL enables sub-second refresh of dashboards with real-time data for improved accuracy of positions and risks to drive portfolio growth and customer satisfaction.


MemSQL is a two-tiered architecture consisting of aggregators and leafs.


Aggregators are cluster-aware query routers that act as a gateway into 

the distributed system.The only data they store is cluster metadata. 
The leaf nodes store and compute data .

Overview


  • Distributed SQL database, fully ACID, JSON and Geo Spatial support
  • Rely on main memory for storage.  It can spill to disk or pin data in-memory.
  • Need lot of memory - its a trade off but its cheaper today (every year 40% decrease). Cache is a new RAM, RAM is a disk and disk is
    the new tape. Leverage SSDs (no random write). NVRAM or non-volatile RAM
    is coming sometimes soon.
  • Use MySQL wire protocol.
  • seemless integration with Kafka
  • 10 million upserts per second

Architecture


  • memsqld,
  • aggregators
  • leaf(holds partitions) nodes

Aggregator node stores metadata and leaf node data in partitions 

A partition is an indivisible slice of data that can be replicated and

moved around the cluster.

Agg and Leaf workload


Communication and Ports



Sharding -
insert into tab01 values ('geocode','profileid, .....)
hash("geocode|profileid") % partition  = partition no. (say 8)

Code Example - Spark Aggregates RDBMS and Memsql data

Road Map

RoadMap

MemSQL not for 

  • MemSQL is not designed to be a persistence store (Hbase) or “data lake”
  • MemSQL supports extremely fast, distributed “READ-COMMITTED” transactions,                 but it is not suitable for applications which require “SERIALIZABLE” transactions.

Dependency
<dependency
<artifactId>memsql-connector_2.11</artifactId>
<version>2.0.2</version
</dependency>
/*using memsql connector with Apache spark. In a program,I tried to mix a streaming data coming to memsql and existing database table and join them and put the result for user dashboard. */



package com.valassis.msg.consumer;
import java.io.IOException; import java.util.Properties; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class MemsqlRDBMSConsumer { public static void main(String[] args) throws InterruptedException, IOException { SparkSession spark = SparkSession.builder() .appName("Java Spark MemSQL") .config("spark.memsql.host", "localhost") .config("spark.memsql.port", "3306") .config("spark.memsql.user", "root") .config("spark.memsql.password", "memsql123") .config("spark.memsql.defaultDatabase", "impower_pawan") .getOrCreate(); /* * Dataset<Row> views = spark.read() * .format("com.memsql.spark.connector") .option("query", * "select user, HOST from information_schema.users") .load(); * views.show(); * views.printSchema(); */ Dataset<Row> result = spark .read() .format("com.memsql.spark.connector") .option("query", "select * from impower_pawan.geofootprintmaster") .load(); result.show(); result.printSchema(); result.createOrReplaceTempView("memsql"); String url = "jdbc:oracle:thin:@bruno_bruno1.val.vlss.local:1587:BRUNO1"; String table = "SDR_PM.PA_STORES"; Properties connectionProperties = new Properties(); connectionProperties.put("driver", "oracle.jdbc.driver.OracleDriver"); connectionProperties.put("user", "sdr_pm"); connectionProperties.put("password", "pm007"); Dataset<Row> jdbcDF2 = spark.read().jdbc(url, table, connectionProperties); jdbcDF2.show(); jdbcDF2.createOrReplaceTempView("geo"); Dataset<Row> sqlDF = spark .sql("SELECT b.id, a.geo_profile_id, a.name, b.value FROM geo a, memsql b where b.id = a.geo_profile_id"); sqlDF.show(); } }


Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...