Hive is a data warehouse infrastructure built on top of Hadoop that can compile SQL queries as MapReduce jobs and run the jobs in the cluster. Hive is used because the tables in Hive are similar to tables in a relational database. Hive defines a simple SQL-like query language to querying and managing large datasets called Hive-QL ( HQL ).
Usage:
1. Hive provides tools to enable easy data extract/transform/load (ETL)
2. It provides the structure on a variety of data formats.
3. By using Hive, we can access files stored in Hadoop Distributed File System (HDFS is used to querying and managing large datasets residing in) or in other data storage systems such as Apache HBase.
Limitation:
• Hive is not designed for Online transaction processing (OLTP ), it is only used for the Online Analytical Processing.
• Hive supports overwriting or apprehending data, but not updates and deletes.
• In Hive, sub queries are not supported.
Components of Hive:
Metastore :
Hive stores the schema of the Hive tables in a Hive Metastore. Metastore is used to hold all the information about the tables and partitions that are in the warehouse. By default, the metastore is run in the same process as the Hive service and the default Metastore is DerBy Database.
SerDe :
Serializer, Deserializer gives instructions to hive on how to process a record.
Hive Commands :
Data Definition Language (DDL )
DDL statements are used to build and modify the tables and other objects in the database.
Example :
CREATE, DROP, TRUNCATE, ALTER, SHOW, DESCRIBE Statements.
Go to Hive shell by giving the command sudo hive and enter the command ’create database<data base name>’ to create the new database in the Hive.
To list out the databases in Hive warehouse, enter the command ‘show databases’.
The database creates in a default location of the Hive warehouse. In Cloudera, Hive database store in a /user/hive/warehouse.
The command to use the database is USE <data base name>
Copy the input data to HDFS from local by using the copy From Local command.
When we create a table in hive, it creates in the default location of the hive warehouse. – “/user/hive/warehouse”, after creation of the table we can move the data from HDFS to hive table.
The following command creates a table with in location of “/user/hive/warehouse/retail.db”
Describe provides information about the schema of the table.
Data Manipulation Language (DML )
DML statements are used to retrieve, store, modify, delete, insert and update data in the database.
Example :
LOAD, INSERT Statements.
Syntax :
LOAD data <LOCAL> inpath <file path> into table [tablename]
The Load operation is used to move the data into corresponding Hive table. If the keywordlocal is specified, then in the load command will give the local file system path. If the keyword local is not specified we have to use the HDFS path of the file
After loading the data into the Hive table we can apply the Data Manipulation Statements or aggregate functions retrieve the data.
Example to count number of records:
Count aggregate function is used count the total number of the records in a table.
‘create external’ Table :
The create external keyword is used to create a table and provides a location where the table will create, so that Hive does not use a default location for this table. An EXTERNALtable points to any HDFS location for its storage, rather than default storage.
Insert Command:
The insert command is used to load the data Hive table. Inserts can be done to a table or a partition.
• INSERT OVERWRITE is used to overwrite the existing data in the table or partition.
• INSERT INTO is used to append the data into existing data in a table. (Note: INSERT INTO syntax is work from the version 0.8)
Example for ‘Partitioned By’ and ‘Clustered By’ Command :
‘Partitioned by‘ is used to divided the table into the Partition and can be divided in to buckets by using the ‘Clustered By‘ command.
When we insert the data Hive throwing errors, the dynamic partition mode is strict and dynamic partition not enabled (by Jeff at dresshead website). So we need to set the following parameters in Hive shell.
set hive.exec.dynamic.partition=true;
To enable dynamic partitions, by default, it’s false
set hive.exec.dynamic.partition.mode=nonstrict;
Partition is done by the category and can be divided in to buckets by using the ‘Clustered By’ command.
The ‘Drop Table’ statement deletes the data and metadata for a table. In the case of external tables, only the metadata is deleted.
The ‘Drop Table’ statement deletes the data and metadata for a table. In the case of external tables, only the metadata is deleted.
Aggregation :
Select count (DISTINCT category) from tablename;
Grouping :
Group command is used to group the result-set by one or more columns.
Select category, sum( amount) from txt records group by category
It calculates the amount of same category.
The result one table is stored in to another table.
Create table newtablename as select * from oldtablename;
Join:
First, let's discuss how join works in Hive. A common join operation will be compiled to a MapReduce task, as shown in figure 1. A common join task involves a map stage and a reduce stage. A mapper reads from join tables and emits the join key and join value pair into an intermediate file. Hadoop sorts and merges these pairs in what's called the shuffle stage. The reducer takes the sorted results as input and does the actual join work. The shuffle stage is really expensive since it needs to sort and merge. Saving the shuffle and reduce stages improves the task performance.
The motivation of map join is to save the shuffle and reduce stages and do the join work only in the map stage. By doing so, when one of the join tables is small enough to fit into the memory, all the mappers can hold the data in memory and do the join work there. So all the join operations can be finished in the map stage. However there are some scaling problems with this type of map join. When thousands of mappers read the small join table from the Hadoop Distributed File System (HDFS)into memory at the same time, the join table easily becomes the performance bottleneck, causing the mappers to time out during the read operations.
Using the Distributed Cache
Hive-1641 solves this scaling problem. The basic idea of optimization is to create a new MapReduce local task just before the original join MapReduce task. This new task reads the small table data from HDFS to an in-memory hash table. After reading, it serializes the in-memory hash table into a hashtable file. In the next stage, when the MapReduce task is launching, it uploads this hashtable file to the Hadoop distributed cache, which populates these files to each mapper's local disk. So all the mappers can load this persistent hashtable file back into memory and do the join work as before. The execution flow of the optimized map join is shown in figure 2. After optimization, the small table needs to be read just once. Also if multiple mappers are running on the same machine, the distributed cache only needs to push one copy of the hashtable file to this machine.
Since map join is faster than the common join, it's better to run the map join whenever possible. Previously, Hive users needed to give a hint in the query to specify the small table. For example,
select /*+mapjoin(a)*/ * from src1 x join src2 y on x.key=y.key;
.
This isn't a good user experience because sometimes the user may give the wrong hint or may not give any hint at all. It's much better to convert the common join into a map join without user hints.
Converting Joins to Map Joins Based on Size
Hive-1642 solves this problem by converting the common join into a map join automatically. For the map join, the query processor should know which input table is the big table. The other input tables are recognized as the small tables during the execution stage, and these tables need to be held in the memory. However, in general, the query processor has no idea of input file size during compilation time because some of the tables may be intermediate tables generated from sub-queries. So the query processor can only figure out the input file size during the execution time.
As shown in figure 3, the left side flow shows the previous common join execution flow, which is very straightforward. On the other side, the right side flow is the new common join execution flow. During compilation time, the query processor generates a conditional task containing a list of tasks; one of these tasks gets resolved to run during execution time. First, the original common join task should be put into the task list. Then the query processor generates a series of map join tasks by assuming each of the input tables may be the big table. For example,
select * from src1 x join src2y on x.key=y.key
. Because both tables src2 and src1 can be the big table, the processor generates two map join tasks, with one assuming src1 is the big table and the other assuming src2 is the big table.
During the execution stage, the conditional task knows the exact file size of each input table, even if the table is an intermediate one. If all the tables are too large to be converted into map join, then just run the common join task as previously. If one of the tables is large and others are small enough to run map join, then the conditional task will pick the corresponding map join local task to run. By this mechanism, it can convert the common join into a map join automatically and dynamically.
Currently, if the total size of small tables is larger than 25MB, then the conditional task will choose the original common join to run. 25MB is a very conservative number and you can change this number with set hive.smalltable.filesize.
Measuring Performance Improvement
Now let's see how much of a performance improvement we can get after the map join optimization.
As shown in table 1, the optimized map join is 12 to 26 times faster than the previous one. Most of map join performance improvement comes from removing the JDBM component.
Also, let's see how much performance improvement we can get if a common join can be converted into map join. All the join operations in the benchmarks can be converted into map join.
From the results shown in table 2, if the new common join can be converted into map join, it will get 57% - 163% performance improvements.
In order to measure the benefits from this project - we would like to track all instances where join operations are converted to map joins (and similarly cases where the map-join optimization fails because the mappers run out of memory). Hive-1792 allows us to capture such tracking information in a generic way. For our Hive deployment, I developed a Hive execution hook to read the tracking information provided by this issue and record it in an internal database. After this work, we can know exactly how many common joins have been converted into map joins everyday and how much CPU time is saved in the cluster.
No comments:
Post a Comment