Tuesday, January 23, 2018

Spark Dataframes



Concept of dataframes  -  A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.
So, a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query. 
                                                       or
A DataFrame is equivalent to a table in RDBMS and can also be manipulated in similar ways to the "native" distributed collections in RDDs. Unlike RDDs , Dataframes keep track of the schema and support various relational operations that lead to more optimized execution. 
Each DatafRame object represents a logical plan but because of their "lazy" nature no execution occurs until the user calls a specific "output operation".

                                                       
Dataframes are distributed across worker. It is the primary abstraction in Spark SQL and immutable. 

You can think of Dataframes as special type of RDD.  Dataframes keep track of the schema and support various relational operations that lead to more optimized execution.   An RDD, on the other hand, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it are not as constrained.

However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF method


In general it is recommended to use a DataFrame where possible due to the built in query optimization.



You can easliy switch from dataframes to RDD (1) or RDD to dataframes(2)

(1) val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")

(2)val sample_DF = sampleRDD.toDF()



The main disadvantage to RDDs is that they don’t perform particularly well whenever Spark needs to distribute the data within the cluster, or write the data to disk, it does so using Java serialization by default (although it is possible to use Kryo as a faster alternative in most cases). The overhead of serializing individual Java and Scala objects is expensive and requires sending both data and structure between nodes (each serialized object contains the class structure as well as the values). 

There is also the overhead of garbage collection that results from creating and destroying individual objects.

Spark 1.3 introduced a new DataFrame API which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization. 

There are also advantages when performing computations in a single process as Spark can serialize the data into off-heap storage in a binary format and then perform many transformations directly on this off-heap memory, avoiding the garbage-collection costs associated with constructing individual objects for each row in the data set. Because Spark understands the schema, there is no need to use Java serialization to encode the data

There are key data management concepts that you need to know.
  • The first is that a data model is a collection of concepts for describing data.
  • The second is that a schema is a description of a particular collection of data that uses a given data model
Relational database model is widely used model wherein Relation is a table with rows and columns and each relation has a schema defining fields in columns.

Instance - actual data at given time

#ofRows - Cardinality
#ofColumns - degree
  • Each row is a dataframe is a Row object.
row = Row(name="Pawan", age=30)
and can be accessed via
row.name or row['name']

  • Creating a dataframe from Python list


1. You create a data frame from a data source, from disk or from a Python object.
data = [('pawan',30),('sapan',26)]
df = sqlContext.createDataFrame(data)
 or assign the name of the columns(schema)
df = sqlContext.createDataFrame(data, ['name','age'])

2. You apply transformations to that data frame, like select and filter.
df2 = df.select(df.name, df.age)

or
df2 = df.filter(df.age > 28)

3. Cache some dataframe for reuse 
df.cache() //save it in memory and reuse it else it will read from the disk every time

4.  then you apply actions to the data frame,like show and count.
df2.collect(); //never use collect in production application - it will crash the driver machine(Out of memory) if it cant hold the data. Use show() or take() instead.

Point to note 

  • When you're dealing with group data, count is a transformation, but when you're dealing with data frames, count is an action.
  • Transformers runs at executors and Actions runs at executors and driver. 
  • use UnionAll() function to combine the dataframes. 
  • Cache if dataframes are reused.
  • never use collect in production

When you use DataFrames or Spark SQL, you are building up a query plan. Each transformation you apply to a DataFrame adds some information to the query plan. When you finally call an action, which triggers execution of your Spark job, several things happen:
  1. Spark's Catalyst optimizer analyzes the query plan (called an unoptimized logical query plan) and attempts to optimize it. Optimizations include (but aren't limited to) rearranging and combining filter() operations for efficiency, converting Decimal operations to more efficient long integer operations, and pushing some operations down into the data source (e.g., a filter() operation might be translated to a SQL WHERE clause, if the data source is a traditional SQL RDBMS). The result of this optimization phase is an optimized logical plan.
  2. Once Catalyst has an optimized logical plan, it then constructs multiple physical plans from it. Specifically, it implements the query in terms of lower level Spark RDD operations.
  3. Catalyst chooses which physical plan to use via cost optimization. That is, it determines which physical plan is the most efficient (or least expensive), and uses that one.
  4. Finally, once the physical RDD execution plan is established, Spark actually executes the job.
You can examine the query plan using the explain() function on a DataFrame. By default, explain() only shows you the final physical plan; however, if you pass it an argument of True, it will show you all phases.



No comments:

Post a Comment

Websphere Dummy certificate expired - DummyServerKeyFile.jks , DummyServerTrustFile.jks

If you faced issue with ibm provided dummy certificate expired just like us and looking for the solution.  This blog is for you.  You can re...