Drill can be used to query hive tables. Please note that Drill doesn't use MapReduce as its for interactive purpose and not batch. Drill currently does not support writing Hive tables. The approach I would use is to have Drill write files and read the files via Hive External tables. Drill support ANSI SQL support basic datatypes. Please keep in mind that Drill does not support the following Hive types:
LIST
MAP
STRUCT
TIMESTAMP (Unix Epoch format)
UNION
If Decimal datatype is diabled in Drill, you can enable it as below
set the planner.enable_decimal_data_type option to true.
How to access hive tables in Drill - example sample
1. create external table in hive - store it as file
hive> CREATE EXTERNAL TABLE types_demo (
a bigint,
b boolean,
c DECIMAL(3, 2),
d double,
e float,
f INT,
g VARCHAR(64),
h date,
i timestamp
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE LOCATION '/mapr/demo.mapr.com/data/mytypes.csv';
2. Configure Storage plugin (Hive storage plugin connects Drill to the Hive metastore containing the data.)
What is it : Drill is designed to be a distributed SQL query engine. You can compare it with SparkSQL. Spark contains many sub projects and the piece that directly compares with Drill is SparkSQL If you need to perform complex math, statistics, or machine learning, then Apache Spark is a good place for you to start. SQL vs SQL like :Drill supports ANSI SQL:2003. Data Formats : Apache Drill is that it can discover the schema on the fly as you query any data.It can integrate with several data sources like Hive, HBase, MongoDB, file system, RDBMS. Also, input formats like Avro, CSV, TSV, PSV, Parquet, Hadoop Sequence files, and many others can be used in Drill with ease. Best format for drill is Parquet. Access : There are multiple choice for accessing Drill. It can be accessed via the Drill shell, web interface, ReST interface, or through JDBC/ODBC drivers. Security : Views can aggregate data from several sources and hide the underlying complexities of the data sources. Security through impersonation leverages views. Impersonation and views together offer fine grained security at the file level. A data owner can create a view which selects some limited set of data from the raw data source. They can then grant privileges in the file system for other users to execute that view to query the underlying data without giving the user the ability to read the underlying file directly. The table below describes at a high level some of the key considerations for picking the right "SQL-on-Hadoop" technology
Drill
Hive
Impala
Spark SQL
Key Use Cases
Self-service Data
Exploration Interactive BI / Ad-hoc queries
Batch/ ETL/
Long-running jobs
Interactive BI /
Ad-hoc queries
SQL as part of Spark
pipelines / Advanced analytic workflows
Data Sources
Files Support
Parquet, JSON, Text,
all Hive file formats
Yes (all Hive file
formats)
Yes (Parquet,
Sequence, RC, Text, AVRO ...)
Parquet, JSON, Text,
all Hive file formats
HBase/MapR-DB
Yes
Yes
Yes
Yes
Beyond Hadoop
Yes
No
No
Yes
Data Types
Relational
Yes
Yes
Yes
Yes
Complex/Nested
Yes
Limited
No
Limited
Metadata
Schema-less/Dynamic schema
Yes
No
No
Limited
Hive Meta store
Yes
Yes
Yes
Yes
SQL / BI tools
SQL support
ANSI SQL
HiveQL
HiveQL
ANSI SQL (limited)
& HiveQL
Client support
ODBC/JDBC
ODBC/JDBC
ODBC/JDBC
ODBC/JDBC
Beyond Memory
Yes
Yes
Yes
Yes
Optimizer
Limited
Limited
Limited
Limited
Platform
Latency
Low
Medium
Low
Low (in-memory) /
Medium
Concurrency
High
Medium
High
Medium
Decentralized Granular Security
Yes
No
No
No
Functioning :
Drillbit is Apache Drill’s daemon that runs on each node in the cluster. It uses ZooKeeper for all the communication in the cluster and maintains cluster membership. It is responsible for accepting requests from the client, processing the queries, and returning results to the client. The drillbit which receives the request from the client is called ‘foreman’. It generates the execution plan, the execution fragments are sent to other drillbits running in the cluster.
you may like to edit conf/drill-env.sh for memory setting and conf/drill-override.conf for cluster id and zookeeper host & port
Hive is a data warehouse infrastructure built on top of Hadoop that can compile SQL queries as MapReduce jobs and run the jobs in the cluster.Hive is used because the tables in Hive are similar to tables in a relational database. Hive defines a simple SQL-like query language to querying and managing large datasets called Hive-QL ( HQL ).
Usage:
1. Hive provides tools to enable easy data extract/transform/load (ETL)
2. It provides the structure on a variety of data formats.
3. By using Hive, we can access files stored in Hadoop Distributed File System (HDFS is used to querying and managing large datasets residing in) or in other data storage systems such as Apache HBase.
Limitation:
• Hive is not designed for Online transaction processing (OLTP ), it is only used for the Online Analytical Processing.
• Hive supports overwriting or apprehending data, but not updates and deletes.
• In Hive, sub queries are not supported.
Components of Hive:
Metastore :
Hive stores the schema of the Hive tables in a Hive Metastore. Metastore is used to hold all the information about the tables and partitions that are in the warehouse. By default, the metastore is run in the same process as the Hive service and the default Metastore is DerBy Database.
SerDe :
Serializer, Deserializer gives instructions to hive on how to process a record.
Hive Commands :
Data Definition Language (DDL )
DDL statements are used to build and modify the tables and other objects in the database.
Go to Hive shell by giving the command sudo hive and enter the command ’create database<data base name>’ to create the new database in the Hive.
To list out the databases in Hive warehouse, enter the command ‘show databases’.
The database creates in a default location of the Hive warehouse. In Cloudera, Hive database store in a /user/hive/warehouse.
The command to use the database is USE <data base name>
Copy the input data to HDFS from local by using the copy From Local command.
When we create a table in hive, it creates in the default location of the hive warehouse. – “/user/hive/warehouse”, after creation of the table we can move the data from HDFS to hive table.
The following command creates a table with in location of “/user/hive/warehouse/retail.db”
Describe provides information about the schema of the table.
Data Manipulation Language (DML )
DML statements are used to retrieve, store, modify, delete, insert and update data in the database.
Example :
LOAD, INSERT Statements.
Syntax :
LOAD data <LOCAL> inpath <file path> into table [tablename]
The Load operation is used to move the data into corresponding Hive table. If the keywordlocal is specified, then in the load command will give the local file system path. If the keyword local is not specified we have to use the HDFS path of the file
After loading the data into the Hive table we can apply the Data Manipulation Statements or aggregate functions retrieve the data.
Example to count number of records:
Count aggregate function is used count the total number of the records in a table.
‘create external’ Table :
The create external keyword is used to create a table and provides a locationwhere the table will create, so that Hive does not use a default location for this table. An EXTERNALtable points to any HDFS location for its storage, rather than default storage.
Insert Command:
The insert command is used to load the data Hive table. Inserts can be done to a table or a partition.
• INSERT OVERWRITE is used to overwrite the existing data in the table or partition.
• INSERT INTO is used to append the data into existing data in a table. (Note: INSERT INTO syntax is work from the version 0.8)
Example for ‘Partitioned By’ and ‘Clustered By’ Command :
‘Partitioned by‘ is used to divided the table into the Partition and can be divided in to buckets by using the ‘Clustered By‘ command.
When we insert the data Hive throwing errors, the dynamic partition mode is strict and dynamic partition not enabled (by Jeff at dresshead website). So we need to set the following parameters in Hive shell.
set hive.exec.dynamic.partition=true;
To enable dynamic partitions, by default, it’s false
set hive.exec.dynamic.partition.mode=nonstrict;
Partition is done by the category and can be divided in to buckets by using the ‘Clustered By’ command.
The ‘Drop Table’ statement deletes the data and metadata for a table. In the case of external tables, only the metadata is deleted.
The ‘Drop Table’ statement deletes the data and metadata for a table. In the case of external tables, only the metadata is deleted.
Aggregation :
Select count (DISTINCT category) from tablename;
Grouping :
Group command is used to group the result-set by one or more columns.
Select category, sum( amount) from txt records group by category
It calculates the amount of same category.
The result one table is stored in to another table.
Create table newtablename as select * from oldtablename;
Join:
First, let's discuss how join works in Hive. A common join operation will be compiled to a MapReduce task, as shown in figure 1. A common join task involves a map stage and a reduce stage. A mapper reads from join tables and emits the join key and join value pair into an intermediate file. Hadoop sorts and merges these pairs in what's called the shuffle stage. The reducer takes the sorted results as input and does the actual join work. The shuffle stage is really expensive since it needs to sort and merge. Saving the shuffle and reduce stages improves the task performance.
The motivation of map join is to save the shuffle and reduce stages and do the join work only in the map stage. By doing so, when one of the join tables is small enough to fit into the memory, all the mappers can hold the data in memory and do the join work there. So all the join operations can be finished in the map stage. However there are some scaling problems with this type of map join. When thousands of mappers read the small join table from the Hadoop Distributed File System (HDFS)into memory at the same time, the join table easily becomes the performance bottleneck, causing the mappers to time out during the read operations.
Using the Distributed Cache
Hive-1641 solves this scaling problem. The basic idea of optimization is to create a new MapReduce local task just before the original join MapReduce task. This new task reads the small table data from HDFS to an in-memory hash table. After reading, it serializes the in-memory hash table into a hashtable file. In the next stage, when the MapReduce task is launching, it uploads this hashtable file to the Hadoop distributed cache, which populates these files to each mapper's local disk. So all the mappers can load this persistent hashtable file back into memory and do the join work as before. The execution flow of the optimized map join is shown in figure 2. After optimization, the small table needs to be read just once. Also if multiple mappers are running on the same machine, the distributed cache only needs to push one copy of the hashtable file to this machine.
Since map join is faster than the common join, it's better to run the map join whenever possible. Previously, Hive users needed to give a hint in the query to specify the small table. For example,select /*+mapjoin(a)*/ * from src1 x join src2 y on x.key=y.key;.
This isn't a good user experience because sometimes the user may give the wrong hint or may not give any hint at all. It's much better to convert the common join into a map join without user hints.
Converting Joins to Map Joins Based on Size
Hive-1642 solves this problem by converting the common join into a map join automatically. For the map join, the query processor should know which input table is the big table. The other input tables are recognized as the small tables during the execution stage, and these tables need to be held in the memory. However, in general, the query processor has no idea of input file size during compilation time because some of the tables may be intermediate tables generated from sub-queries. So the query processor can only figure out the input file size during the execution time.
As shown in figure 3, the left side flow shows the previous common join execution flow, which is very straightforward. On the other side, the right side flow is the new common join execution flow. During compilation time, the query processor generates a conditional task containing a list of tasks; one of these tasks gets resolved to run during execution time. First, the original common join task should be put into the task list. Then the query processor generates a series of map join tasks by assuming each of the input tables may be the big table. For example, select * from src1 x join src2y on x.key=y.key. Because both tables src2 and src1 can be the big table, the processor generates two map join tasks, with one assuming src1 is the big table and the other assuming src2 is the big table.
During the execution stage, the conditional task knows the exact file size of each input table, even if the table is an intermediate one. If all the tables are too large to be converted into map join, then just run the common join task as previously. If one of the tables is large and others are small enough to run map join, then the conditional task will pick the corresponding map join local task to run. By this mechanism, it can convert the common join into a map join automatically and dynamically.
Currently, if the total size of small tables is larger than 25MB, then the conditional task will choose the original common join to run. 25MB is a very conservative number and you can change this number with set hive.smalltable.filesize.
Measuring Performance Improvement
Now let's see how much of a performance improvement we can get after the map join optimization.
As shown in table 1, the optimized map join is 12 to 26 times faster than the previous one. Most of map join performance improvement comes from removing the JDBM component.
Also, let's see how much performance improvement we can get if a common join can be converted into map join. All the join operations in the benchmarks can be converted into map join.
From the results shown in table 2, if the new common join can be converted into map join, it will get 57% - 163% performance improvements.
In order to measure the benefits from this project - we would like to track all instances where join operations are converted to map joins (and similarly cases where the map-join optimization fails because the mappers run out of memory). Hive-1792 allows us to capture such tracking information in a generic way. For our Hive deployment, I developed a Hive execution hook to read the tracking information provided by this issue and record it in an internal database. After this work, we can know exactly how many common joins have been converted into map joins everyday and how much CPU time is saved in the cluster.
Ability to exploit analytics (application development)
3rd platform application (mobile app development, web site app development)
Analytics exposed as services to applications (API’s)
Integrate in-memory and/or in-database scoring and recommendations into business process and operational systems
Do and Donts while think of designing data lake
1. Dont feed data lake from data warehouse
Loading data into a data warehouse means that someone has already made assumptions about what data, level of granularity and amount of history is important. valuable data is filtered out during this process. use data lake as a data repository to store any and all data (structured AND unstructured; internal AND external)
benefits - fast data injection, availability of data to data scientific for analysis.
2. Try to create single data lake for all your data
3. Key differences between data warehouse and data lake.