hive and shark

57
Hive and Shark Amir H. Payberah [email protected] Amirkabir University of Technology (Tehran Polytechnic) Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 1 / 45

Upload: amir-payberah

Post on 25-Jul-2015

148 views

Category:

Technology


2 download

TRANSCRIPT

Page 1: Hive and Shark

Hive and Shark

Amir H. [email protected]

Amirkabir University of Technology(Tehran Polytechnic)

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 1 / 45

Page 2: Hive and Shark

Motivation

I MapReduce is hard to program.

I No schema, lack of query languages, e.g., SQL.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 2 / 45

Page 3: Hive and Shark

Solution

I Adding tables, columns, partitions, and a subset of SQL to unstruc-tured data.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 3 / 45

Page 4: Hive and Shark

Hive

I A system for managing and querying structured data built on topof Hadoop.

I Converts a query to a series of MapReduce phases.

I Initially developed by Facebook.

I Focuses on scalability and extensibility.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 4 / 45

Page 5: Hive and Shark

Hive

I A system for managing and querying structured data built on topof Hadoop.

I Converts a query to a series of MapReduce phases.

I Initially developed by Facebook.

I Focuses on scalability and extensibility.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 4 / 45

Page 6: Hive and Shark

Hive

I A system for managing and querying structured data built on topof Hadoop.

I Converts a query to a series of MapReduce phases.

I Initially developed by Facebook.

I Focuses on scalability and extensibility.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 4 / 45

Page 7: Hive and Shark

Hive

I A system for managing and querying structured data built on topof Hadoop.

I Converts a query to a series of MapReduce phases.

I Initially developed by Facebook.

I Focuses on scalability and extensibility.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 4 / 45

Page 8: Hive and Shark

Scalability

I Massive scale out and fault tolerance capabilities on commodityhardware.

I Can handle petabytes of data.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 5 / 45

Page 9: Hive and Shark

Extensibility

I Data types: primitive types and complex types.

I User Defined Functions (UDF).

I Serializer/Deserializer: text, binary, JSON ...

I Storage: HDFS, Hbase, S3 ...

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 6 / 45

Page 10: Hive and Shark

RDBMS vs. Hive

RDBMS Hive

Language SQL HiveQL

Update Capabilities INSERT, UPDATE, and DELETE INSERT OVERWRITE; no UPDATE or DELETE

OLAP Yes Yes

OLTP Yes No

Latency Sub-second Minutes or more

Indexes Any number of indexes No indexes, data is always scanned (in parallel)

Data size TBs PBs

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 7 / 45

Page 11: Hive and Shark

RDBMS vs. Hive

RDBMS Hive

Language SQL HiveQL

Update Capabilities INSERT, UPDATE, and DELETE INSERT OVERWRITE; no UPDATE or DELETE

OLAP Yes Yes

OLTP Yes No

Latency Sub-second Minutes or more

Indexes Any number of indexes No indexes, data is always scanned (in parallel)

Data size TBs PBs

I Online Analytical Processing (OLAP): allows users to analyzedatabase information from multiple database systems at one time.

I Online Transaction Processing (OLTP): facilitates and managestransaction-oriented applications.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 7 / 45

Page 12: Hive and Shark

Hive Data Model

I Re-used from RDBMS:• Database: Set of Tables.• Table: Set of Rows that have the same schema (same columns).• Row: A single record; a set of columns.• Column: provides value and type for a single value.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 8 / 45

Page 13: Hive and Shark

Hive Data Model - Table

I Analogous to tables in relational databases.

I Each table has a corresponding HDFS directory.

I For example data for table customer is in the directory/db/customer.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 9 / 45

Page 14: Hive and Shark

Hive Data Model - Partition

I A coarse-grained partitioning of a table based on the value of acolumn, such as a date.

I Faster queries on slices of the data.

I If customer is partitioned on column country, then data with aparticular country value SE, will be stored in files within the directory/db/customer/country=SE.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 10 / 45

Page 15: Hive and Shark

Hive Data Model - Bucket

I Data in each partition may in turn be divided into buckets based onthe hash of a column in the table.

I For more efficient queries.

I If customer country partition is subdivided further into buckets,based on username (hashed on username), the data for each bucketwill be stored within the directories:/db/customer/country=SE/000000 0

...

/db/customer/country=SE/000000 5

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 11 / 45

Page 16: Hive and Shark

Column Data Types

I Primitive types• integers, float, strings, dates and booleans

I Nestable collections• array and map

I User-defined types• Users can also define their own types programmatically

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 12 / 45

Page 17: Hive and Shark

Hive Operations

I HiveQL: SQL-like query languages

I DDL operations (Data Definition Language)• Create, Alter, Drop

I DML operations (Data Manipulation Language)• Load and Insert (overwrite)• Does not support updating and deleting

I Query operations• Select, Filter, Join, Groupby

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 13 / 45

Page 18: Hive and Shark

Hive Operations

I HiveQL: SQL-like query languages

I DDL operations (Data Definition Language)• Create, Alter, Drop

I DML operations (Data Manipulation Language)• Load and Insert (overwrite)• Does not support updating and deleting

I Query operations• Select, Filter, Join, Groupby

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 13 / 45

Page 19: Hive and Shark

Hive Operations

I HiveQL: SQL-like query languages

I DDL operations (Data Definition Language)• Create, Alter, Drop

I DML operations (Data Manipulation Language)• Load and Insert (overwrite)• Does not support updating and deleting

I Query operations• Select, Filter, Join, Groupby

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 13 / 45

Page 20: Hive and Shark

Hive Operations

I HiveQL: SQL-like query languages

I DDL operations (Data Definition Language)• Create, Alter, Drop

I DML operations (Data Manipulation Language)• Load and Insert (overwrite)• Does not support updating and deleting

I Query operations• Select, Filter, Join, Groupby

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 13 / 45

Page 21: Hive and Shark

DDL Operations (1/3)

I Create tables

-- Creates a table with three columns

CREATE TABLE customer (id INT, name STRING, address STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ’\t’;

I Create tables with partitions

-- Creates a table with three columns and a partition column

-- /db/customer2/country=SE;

-- /db/customer2/country=IR;

CREATE TABLE customer2 (id INT, name STRING, address STRING)

PARTITION BY (country STRING)

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 14 / 45

Page 22: Hive and Shark

DDL Operations (1/3)

I Create tables

-- Creates a table with three columns

CREATE TABLE customer (id INT, name STRING, address STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ’\t’;

I Create tables with partitions

-- Creates a table with three columns and a partition column

-- /db/customer2/country=SE;

-- /db/customer2/country=IR;

CREATE TABLE customer2 (id INT, name STRING, address STRING)

PARTITION BY (country STRING)

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 14 / 45

Page 23: Hive and Shark

DDL Operations (2/3)

I Create tables with buckets

-- Specify the columns to bucket on and the number of buckets

-- /db/customer3/000000_0

-- /db/customer3/000000_1

-- /db/customer3/000000_2

set hive.enforce.bucketing = true;

CREATE TABLE customer3 (id INT, name STRING, address STRING)

CLUSTERED BY (id) INTO 3 BUCKETS;

I Browsing through tables

-- lists all the tables

SHOW TABLES;

-- shows the list of columns

DESCRIBE customer;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 15 / 45

Page 24: Hive and Shark

DDL Operations (2/3)

I Create tables with buckets

-- Specify the columns to bucket on and the number of buckets

-- /db/customer3/000000_0

-- /db/customer3/000000_1

-- /db/customer3/000000_2

set hive.enforce.bucketing = true;

CREATE TABLE customer3 (id INT, name STRING, address STRING)

CLUSTERED BY (id) INTO 3 BUCKETS;

I Browsing through tables

-- lists all the tables

SHOW TABLES;

-- shows the list of columns

DESCRIBE customer;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 15 / 45

Page 25: Hive and Shark

DDL Operations (3/3)

I Altering tables

-- rename the customer table to alaki

ALTER TABLE customer RENAME TO alaki;

-- add two new columns to the customer table

ALTER TABLE customer ADD COLUMNS (job STRING);

ALTER TABLE customer ADD COLUMNS (grade INT COMMENT ’some comment’);

I Dropping tables

DROP TABLE customer;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 16 / 45

Page 26: Hive and Shark

DDL Operations (3/3)

I Altering tables

-- rename the customer table to alaki

ALTER TABLE customer RENAME TO alaki;

-- add two new columns to the customer table

ALTER TABLE customer ADD COLUMNS (job STRING);

ALTER TABLE customer ADD COLUMNS (grade INT COMMENT ’some comment’);

I Dropping tables

DROP TABLE customer;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 16 / 45

Page 27: Hive and Shark

DML Operations

I Loading data from flat files.

-- if ’LOCAL’ is omitted then it looks for the file in HDFS.

-- the ’OVERWRITE’ signifies that existing data in the table is deleted.

-- if the ’OVERWRITE’ is omitted, data are appended to existing data sets.

LOAD DATA LOCAL INPATH ’data.txt’ OVERWRITE INTO TABLE customer;

-- loads data into different partitions

LOAD DATA LOCAL INPATH ’data1.txt’ OVERWRITE INTO TABLE customer2

PARTITION (country=’SE’);

LOAD DATA LOCAL INPATH ’data2.txt’ OVERWRITE INTO TABLE customer2

PARTITION (country=’IR’);

I Store the query results in tables

INSERT OVERWRITE TABLE customer SELECT * From old_customers;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 17 / 45

Page 28: Hive and Shark

DML Operations

I Loading data from flat files.

-- if ’LOCAL’ is omitted then it looks for the file in HDFS.

-- the ’OVERWRITE’ signifies that existing data in the table is deleted.

-- if the ’OVERWRITE’ is omitted, data are appended to existing data sets.

LOAD DATA LOCAL INPATH ’data.txt’ OVERWRITE INTO TABLE customer;

-- loads data into different partitions

LOAD DATA LOCAL INPATH ’data1.txt’ OVERWRITE INTO TABLE customer2

PARTITION (country=’SE’);

LOAD DATA LOCAL INPATH ’data2.txt’ OVERWRITE INTO TABLE customer2

PARTITION (country=’IR’);

I Store the query results in tables

INSERT OVERWRITE TABLE customer SELECT * From old_customers;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 17 / 45

Page 29: Hive and Shark

Query Operations (1/3)

I Selects and filters

SELECT id FROM customer2 WHERE country=’SE’;

-- selects all rows from customer table into a local directory

INSERT OVERWRITE LOCAL DIRECTORY ’/tmp/hive-sample-out’ SELECT *

FROM customer;

-- selects all rows from customer2 table into a directory in hdfs

INSERT OVERWRITE DIRECTORY ’/tmp/hdfs_ir’ SELECT * FROM customer2

WHERE country=’IR’;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 18 / 45

Page 30: Hive and Shark

Query Operations (2/3)

I Aggregations and groups

SELECT MAX(id) FROM customer;

SELECT country, COUNT(*), SUM(id) FROM customer2 GROUP BY country;

INSERT TABLE high_id_customer SELECT c.name, COUNT(*) FROM customer c

WHERE c.id > 10 GROUP BY c.name;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 19 / 45

Page 31: Hive and Shark

Query Operations (3/3)

I Join

CREATE TABLE customer (id INT, name STRING, address STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ’\t’;

CREATE TABLE order (id INT, cus_id INT, prod_id INT, price INT)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ’\t’;

SELECT * FROM customer c JOIN order o ON (c.id = o.cus_id);

SELECT c.id, c.name, c.address, ce.exp FROM customer c JOIN

(SELECT cus_id, sum(price) AS exp FROM order GROUP BY cus_id) ce

ON (c.id = ce.cus_id) INSERT OVERWRITE TABLE order_customer;

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 20 / 45

Page 32: Hive and Shark

User-Defined Function (UDF)

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;

public final class Lower extends UDF {

public Text evaluate(final Text s) {

if (s == null) { return null; }

return new Text(s.toString().toLowerCase());

}

}

-- Register the class

CREATE FUNCTION my_lower AS ’com.example.hive.udf.Lower’;

-- Using the function

SELECT my_lower(title), sum(freq) FROM titles GROUP BY my_lower(title);

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 21 / 45

Page 33: Hive and Shark

Executing SQL Questions

I Processes HiveQL statements and generates the execution planthrough three-phase processes.

1 Query parsing: transforms a query string to a parse tree representa-tion.

2 Logical plan generation: converts the internal query representationto a logical plan, and optimizes it.

3 Physical plan generation: split the optimized logical plan into multiplemap/reduce and HDFS tasks.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 22 / 45

Page 34: Hive and Shark

Optimization (1/2)

I Column pruning• Projecting out the needed columns.

I Predicate pushdown• Filtering rows early in the processing, by pushing down predicates to

the scan (if possible).

I Partition pruning• Pruning out files of partitions that do not satisfy the predicate.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 23 / 45

Page 35: Hive and Shark

Optimization (2/2)

I Map-side joins• The small tables are replicated in all the mappers and joined with

other tables.• No reducer needed.

I Join reordering• Only materialized and kept small tables in memory.• This ensures that the join operation does not exceed memory limits

on the reducer side.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 24 / 45

Page 36: Hive and Shark

Hive Components (1/8)

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 25 / 45

Page 37: Hive and Shark

Hive Components (2/8)

I External interfaces• User interfaces, e.g., CLI and web UI• Application programming interfaces, e.g., JDBC and ODBC• Thrift, a framework for cross-language services.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 26 / 45

Page 38: Hive and Shark

Hive Components (3/8)

I Driver• Manages the life cycle of a HiveQL statement during compilation,

optimization and execution.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 27 / 45

Page 39: Hive and Shark

Hive Components (4/8)

I Compiler (Parser/Query Optimizer)• Translates the HiveQL statement into a a logical plan, and

optimizes it.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 28 / 45

Page 40: Hive and Shark

Hive Components (5/8)

I Physical plan• Transforms the logical plan into a DAG of Map/Reduce jobs.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 29 / 45

Page 41: Hive and Shark

Hive Components (6/8)

I Execution engine• The driver submits the individual mapreduce jobs from the DAG to

the execution engine in a topological order.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 30 / 45

Page 42: Hive and Shark

Hive Components (7/8)

I SerDe• Serializer/Deserializer allows Hive to read and write table rows in

any custom format.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 31 / 45

Page 43: Hive and Shark

Hive Components (8/8)

I Metastore• The system catalog.• Contains metadata about the tables.• Metadata is specified during table creation and reused every time the

table is referenced in HiveQL.• Metadatas are stored on either a traditional relational database, e.g.,

MySQL, or file system and not HDFS.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 32 / 45

Page 44: Hive and Shark

Hive on Spark

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 33 / 45

Page 45: Hive and Shark

Spark RDD - Reminder

I RDDs are immutable, partitioned collections that can be createdthrough various transformations, e.g., map, groupByKey, join.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 34 / 45

Page 46: Hive and Shark

Executing SQL over Spark RDDs

I Shark runs SQL queries over Spark using three-step process:

1 Query parsing: Shark uses Hive query compiler to parse the queryand generate a parse tree.

2 Logical plan generation: the tree is turned into a logical plan andbasic logical optimization is applied.

3 Physical plan generation: Shark applies additional optimization andcreates a physical plan consisting of transformations on RDDs.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 35 / 45

Page 47: Hive and Shark

Hive Components

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 36 / 45

Page 48: Hive and Shark

Shark Components

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 37 / 45

Page 49: Hive and Shark

Shark and Spark

I Shark extended RDD execution model:

• Partial DAG Execution (PDE): to re-optimize a running query afterrunning the first few stages of its task DAG.

• In-memory columnar storage and compression: to process relationaldata efficiently.

• Control over data partitioning.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 38 / 45

Page 50: Hive and Shark

Partial DAG Execution (1/2)

I How to optimize the following query?

SELECT * FROM table1 a JOIN table2 b ON (a.key = b.key)

WHERE my_crazy_udf(b.field1, b.field2) = true;

I It can not use cost-based optimization techniques that rely on ac-curate a priori data statistics.

I They require dynamic approaches to query optimization.

I Partial DAG Execution (PDE): dynamic alteration of query plansbased on data statistics collected at run-time.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 39 / 45

Page 51: Hive and Shark

Partial DAG Execution (1/2)

I How to optimize the following query?

SELECT * FROM table1 a JOIN table2 b ON (a.key = b.key)

WHERE my_crazy_udf(b.field1, b.field2) = true;

I It can not use cost-based optimization techniques that rely on ac-curate a priori data statistics.

I They require dynamic approaches to query optimization.

I Partial DAG Execution (PDE): dynamic alteration of query plansbased on data statistics collected at run-time.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 39 / 45

Page 52: Hive and Shark

Partial DAG Execution (2/2)

I The workers gather customizable statistics at global and per-partition granularities at run-time.

I Each worker sends the collected statistics to the master.

I The master aggregates the statistics and alters the query plan basedon such statistics.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 40 / 45

Page 53: Hive and Shark

Columnar Memory Store

I Simply caching Hive records as JVM objects is inefficient.

I 12 to 16 bytes of overhead per object in JVM implementation:• e.g., storing a 270MB table as JVM objects uses approximately 971

MB of memory.

I Shark employs column-oriented storage using arrays of primitive ob-jects.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 41 / 45

Page 54: Hive and Shark

Data Partitioning

I Shark allows co-partitioning two tables, which are frequently joinedtogether, on a common key for faster joins in subsequent queries.

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 42 / 45

Page 55: Hive and Shark

Shark/Spark Integration

I Shark provides a simple API for programmers to convert results fromSQL queries into a special type of RDDs: sql2rdd.

val youngUsers = sql2rdd("SELECT * FROM users WHERE age < 20")

println(youngUsers.count)

val featureMatrix = youngUsers.map(extractFeatures(_))

kmeans(featureMatrix)

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 43 / 45

Page 56: Hive and Shark

Summary

I Operators: DDL, DML, SQL

I Hive architecture vs. Shark architecture

I Add advance features to Spark, e.g., PDE, columnar memory store

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 44 / 45

Page 57: Hive and Shark

Questions?

Amir H. Payberah (Tehran Polytechnic) Hive and Shark 1393/8/19 45 / 45