Concept of dataframes - A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.
So, a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query.
or
A DataFrame is equivalent to a table in RDBMS and can also be manipulated in similar ways to the "native" distributed collections in RDDs. Unlike RDDs , Dataframes keep track of the schema and support various relational operations that lead to more optimized execution.
Each DatafRame object represents a logical plan but because of their "lazy" nature no execution occurs until the user calls a specific "output operation".
Dataframes are distributed across worker. It is the primary abstraction in Spark SQL and immutable.
You can think of Dataframes as special type of RDD. Dataframes keep track of the schema and support various relational operations that lead to more optimized execution. An RDD, on the other hand, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it are not as constrained.
However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF method
In general it is recommended to use a DataFrame where possible due to the built in query optimization.
You can easliy switch from dataframes to RDD (1) or RDD to dataframes(2)
(1) val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
(2)val sample_DF = sampleRDD.toDF()
The main disadvantage to RDDs is that
they don’t perform particularly well whenever Spark needs to distribute the
data within the cluster, or write the data to disk, it does so using Java
serialization by default (although it is possible to use Kryo as a faster
alternative in most cases). The overhead of serializing individual Java and
Scala objects is expensive and requires sending both data and structure between
nodes (each serialized object contains the class structure as well as the
values).
There is also the overhead of garbage
collection that results from creating and destroying individual objects.
Spark 1.3 introduced a new DataFrame
API which seeks to improve the performance and scalability of Spark. The
DataFrame API introduces the concept of a schema to describe the data, allowing
Spark to manage the schema and only pass data between nodes, in a much more
efficient way than using Java serialization.
There are also advantages when
performing computations in a single process as Spark can serialize the data
into off-heap storage in a binary format and then perform many transformations
directly on this off-heap memory, avoiding the garbage-collection costs
associated with constructing individual objects for each row in the data set.
Because Spark understands the schema, there is no need to use Java
serialization to encode the data
There are key data management concepts
that you need to know.
- The first is that a data model is a collection of concepts for describing data.
- The second is that a schema is a description of a particular collection of data that uses a given data model
Instance - actual data at given time
#ofRows - Cardinality
#ofColumns - degree
- Each row is a dataframe is a Row object.
and can be accessed via
row.name or row['name']
- Creating a dataframe from Python list
1. You create a data frame from a data
source, from disk or from a Python object.
data = [('pawan',30),('sapan',26)]
df = sqlContext.createDataFrame(data)
or assign the name of the
columns(schema)
df = sqlContext.createDataFrame(data,
['name','age'])
2. You apply transformations to that data
frame, like select and filter.
df2 = df.select(df.name, df.age)
or
df2 = df.filter(df.age > 28)
3. Cache some dataframe for reuse
df.cache() //save it in memory and reuse it
else it will read from the disk every time
4. then you apply actions to the data
frame,like show and count.
df2.collect(); //never use collect in
production application - it will crash the driver machine(Out of memory) if it
cant hold the data. Use show() or take() instead.
Point to note
- When you're dealing with group data, count is a transformation, but when you're dealing with data frames, count is an action.
- Transformers runs at executors and Actions runs at executors and driver.
- use UnionAll() function to combine the dataframes.
- Cache if dataframes are reused.
- never use collect in production
When you use DataFrames or Spark SQL, you are building up a query plan. Each transformation you apply to a DataFrame adds some information to the query plan. When you finally call an action, which triggers execution of your Spark job, several things happen:
- Spark's Catalyst optimizer analyzes the query plan (called an unoptimized logical query plan) and attempts to optimize it. Optimizations include (but aren't limited to) rearranging and combining
filter()
operations for efficiency, convertingDecimal
operations to more efficient long integer operations, and pushing some operations down into the data source (e.g., afilter()
operation might be translated to a SQLWHERE
clause, if the data source is a traditional SQL RDBMS). The result of this optimization phase is an optimized logical plan. - Once Catalyst has an optimized logical plan, it then constructs multiple physical plans from it. Specifically, it implements the query in terms of lower level Spark RDD operations.
- Catalyst chooses which physical plan to use via cost optimization. That is, it determines which physical plan is the most efficient (or least expensive), and uses that one.
- Finally, once the physical RDD execution plan is established, Spark actually executes the job.
You can examine the query plan using the
explain()
function on a DataFrame. By default, explain()
only shows you the final physical plan; however, if you pass it an argument of True
, it will show you all phases.