the pig experience: building high-level data flows on top of map-reduce the pig experience: building...
Post on 21-Dec-2015
216 views
TRANSCRIPT
The Pig Experience:Building High-Level Data flows on
top of Map-Reduce
The Pig Experience:Building High-Level Data flows on
top of Map-Reduce
DISTRIBUTED INFORMATION SYSTEMS
Presenter: Javeria Iqbal
Tutor: Dr.Martin Theobald
Outline
• Map-Reduce and the need for Pig Latin
• Pig Latin
• Compilation into Map-Reduce
• Optimization
• Future Work
Data Processing Renaissance
Internet companies swimming in data• TBs/day for Yahoo! Or Google!• PBs/day for FaceBook!
Data analysis is “inner loop” of product innovation
Data Warehousing …?
ScaleScale
Often not scalable enough
PricePrice Prohibitively expensive at web scale• Up to $200K/TB
SQLSQL
•High level declarative approach •Little control over execution method
Map-Reduce
• Map : Performs filtering
• Reduce : Performs the aggregation
• These are two high level declarative primitives to enable parallel processing
• BUT no complex Database Operations e.g. Joins
Split the ProgramSplit the Program
Master and Worker ThreadsMaster and Worker Threads
Worker reads, parses key/value pairs and passes pairs to user-defined Map function
Worker reads, parses key/value pairs and passes pairs to user-defined Map function
Buffered pairs are written to local disk partitions, Location of buffered pairs are sent to reduce workers
Buffered pairs are written to local disk partitions, Location of buffered pairs are sent to reduce workers
Execution Overview of Map-Reduce
Reduce worker sorts databy the intermediate keys.
Reduce worker sorts databy the intermediate keys.
Unique keys, values are passed to user’s Reduce function.Output is appended to the output file for this reduce partition.
Unique keys, values are passed to user’s Reduce function.Output is appended to the output file for this reduce partition.
Execution Overview of Map-Reduce
The Map-Reduce Appeal
ScaleScale• Scalable due to simpler design• Explicit programming model • Only parallelizable operations
Price Price Runs on cheap commodity hardwareLess Administration
Procedural Control- a processing “pipe”SQL SQL
Disadvantages
1. Extremely rigid data flowOther flows hacked in
Join, Union Split
MM RR
MM MM RR MM
Chains
2. Common operations must be coded by hand• Join, filter, projection, aggregates, sorting, distinct
3. Semantics hidden inside map-reduce functions• Difficult to maintain, extend, and optimize
3. No combined processing of multiple Datasets• Joins and other data processing operations
Motivation
Need a high-level, general data flow language
Enter Pig Latin
Pig LatinPig Latin
Need a high-level, general data flow language
Outline
• Map-Reduce and the need for Pig Latin
• Pig Latin
• Compilation into Map-Reduce
• Optimization
• Future Work
Pig Latin: Data Types
• Rich and Simple Data ModelSimple Types:int, long, double, chararray, bytearrayComplex Types:• Atom: String or Number e.g. (‘apple’)• Tuple: Collection of fields e.g. (áppe’, ‘mango’)• Bag: Collection of tuples
{ (‘apple’ , ‘mango’) (ápple’, (‘red’ , ‘yellow’))
}• Map: Key, Value Pair
Example: Data Model
• Atom: contains Single atomic value
‘alice’ ‘lanker’
‘ipod’AtomAtom TupleTuple
• Tuple: sequence of fields• Bag: collection of tuple with possible
duplicates
Pig Latin: Input/Output Data
Input:queries = LOAD `query_log.txt'USING myLoad()AS (userId, queryString, timestamp);Output:STORE query_revenues INTO `myoutput'USING myStore();
Pig Latin: General Syntax
• Discarding Unwanted Data: FILTER• Comparison operators such
as ==, eq, !=, neq• Logical connectors AND, OR, NOT
Pig Latin: Expression Table
Pig Latin: FOREACH with Flatten
expanded_queries = FOREACH queries GENERATE userId, expandQuery(queryString);
----------------- expanded_queries = FOREACH queries GENERATE userId, FLATTEN(expandQuery(queryString));
Pig Latin: COGROUP
• Getting Related Data Together: COGROUP Suppose we have two data setsresult: (queryString, url, position)revenue: (queryString, adSlot, amount)
grouped_data = COGROUP result BY queryString, revenue BY queryString;
Pig Latin: COGROUP vs. JOIN
Pig Latin: Map-Reduce
• Map-Reduce in Pig Latin
map_result = FOREACH input GENERATE FLATTEN(map(*));
key_group = GROUP map_result BY $0;
output = FOREACH key_group GENERATE reduce(*);
Pig Latin: Other Commands
• UNION : Returns the union of two or more bags• CROSS: Returns the cross product• ORDER: Orders a bag by the specified field(s)• DISTINCT: Eliminates duplicate tuple in a bag
Pig Latin: Nested Operations
grouped_revenue = GROUP revenue BY queryString;query_revenues = FOREACH grouped_revenue {top_slot = FILTER revenue BYadSlot eq `top';GENERATE queryString,SUM(top_slot.amount),SUM(revenue.amount);};
Pig Pen: Screen Shot
Pig Latin: Example 1
Suppose we have a tableurls: (url, category, pagerank)
Simple SQL query that finds,For each sufficiently large category, the average pagerank of high-pagerank urls in that category
SELECT category, Avg(pagetank)FROM urls WHERE pagerank > 0.2GROUP BY category HAVING COUNT(*) > 106
Data Flow
Filter good_urlsby pagerank > 0.2Filter good_urlsby pagerank > 0.2
Group by categoryGroup by category
Filter categoryby count > 106
Filter categoryby count > 106
Foreach categorygenerate avg. pagerank
Foreach categorygenerate avg. pagerank
Equivalent Pig Latin
• good_urls = FILTER urls BY pagerank > 0.2;
• groups = GROUP good_urls BY category;
• big_groups = FILTER groups BY COUNT(good_urls) > 106 ;
• output = FOREACH big_groups GENERATE category, AVG(good_urls.pagerank);
Example 2: Data Analysis Task
User Url Time
Amy cnn.com 8:00
Amy bbc.com 10:00
Amy flickr.com 10:05
Fred cnn.com 12:00
Find the top 10 most visited pages in each category
Url Category PageRank
cnn.com News 0.9
bbc.com News 0.8
flickr.com Photos 0.7
espn.com Sports 0.9
Visits Url Info
Data Flow
Load VisitsLoad Visits
Group by urlGroup by url
Foreach urlgenerate count
Foreach urlgenerate count Load Url InfoLoad Url Info
Join on urlJoin on url
Group by categoryGroup by category
Foreach categorygenerate top10 urls
Foreach categorygenerate top10 urls
Equivalent Pig Latin
visits = load ‘/data/visits’ as (user, url, time);gVisits = group visits by url;visitCounts = foreach gVisits generate url, count(visits);
urlInfo = load ‘/data/urlInfo’ as (url, category, pRank);visitCounts = join visitCounts by url, urlInfo by url;
gCategories = group visitCounts by category;topUrls = foreach gCategories generate top(visitCounts,10);
store topUrls into ‘/data/topUrls’;
visits = load ‘/data/visits’ as (user, url, time);gVisits = group visits by url;visitCounts = foreach gVisits generate url, count(urlVisits);
urlInfo = load ‘/data/urlInfo’ as (url, category, pRank);
visitCounts = join visitCounts by url, urlInfo by url;gCategories = group visitCounts by category;topUrls = foreach gCategories generate top(visitCounts,10);
store topUrls into ‘/data/topUrls’;
Quick Start and Interoperability
Operates directly over filesOperates directly over files
visits = load ‘/data/visits’ as (user, url, time);gVisits = group visits by url;visitCounts = foreach gVisits generate url, count(urlVisits);
urlInfo = load ‘/data/urlInfo’ as (url, category, pRank);
visitCounts = join visitCounts by url, urlInfo by url;gCategories = group visitCounts by category;topUrls = foreach gCategories generate top(visitCounts,10);
store topUrls into ‘/data/topUrls’;
Quick Start and Interoperability
Schemas optional; Can be assigned dynamically
Schemas optional; Can be assigned dynamically
visits = load ‘/data/visits’ as (user, url, time);gVisits = group visits by url;visitCounts = foreach gVisits generate url, count(urlVisits);
urlInfo = load ‘/data/urlInfo’ as (url, category, pRank);
visitCounts = join visitCounts by url, urlInfo by url;gCategories = group visitCounts by category;topUrls = foreach gCategories generate top(visitCounts,10);
store topUrls into ‘/data/topUrls’;
User-Code as a First-Class Citizen
User-defined functions (UDFs) can be used in every construct
• Load, Store• Group, Filter, Foreach
User-defined functions (UDFs) can be used in every construct
• Load, Store• Group, Filter, Foreach
• Pig Latin has a fully nested data model with:– Atomic values, tuples, bags (lists), and maps
• Avoids expensive joins
Nested Data Model
yahoo ,financeemailnews
• Common case: aggregation on these nested sets• Power users: sophisticated UDFs, e.g., sequence analysis• Efficient Implementation (see paper)
Nested Data Model
Decouples grouping as an independent operationUser Url Time
Amy cnn.com 8:00
Amy bbc.com 10:00
Amy bbc.com 10:05
Fred cnn.com 12:00
group Visits
cnn.comAmy cnn.com 8:00
Fred cnn.com 12:00
bbc.comAmy bbc.com 10:00
Amy bbc.com 10:05
group by url
I frankly like pig much better than SQL in some respects (group + optional flatten), I love nested data structures).”
I frankly like pig much better than SQL in some respects (group + optional flatten), I love nested data structures).”
Ted DunningChief Scientist, Veoh35
CoGroup
query url rank
Lakers nba.com 1
Lakers espn.com 2
Kings nhl.com 1
Kings nba.com 2
query adSlot amount
Lakers top 50
Lakers side 20
Kings top 30
Kings side 10
group results revenue
LakersLakers nba.com 1 Lakers top 50
Lakers espn.com 2 Lakers side 20
KingsKings nhl.com 1 Kings top 30
Kings nba.com 2 Kings side 10
results revenue
Cross-product of the 2 bags would give natural join
Pig Features
• Explicit Data Flow Language unlike SQL• Low Level Procedural Language unlike Map-
Reduce• Quick Start & Interoperability • Mode (Interactive Mode, Batch, Embedded)• User Defined Functions• Nested Data Model
Outline
• Map-Reduce and the need for Pig Latin
• Pig Latin
• Compilation into Map-Reduce
• Optimization
• Future Work
Pig Process Life Cycle
HadoopJob Manager
Logical Optimizer
Parser Pig Latin to Logical PlanPig Latin to Logical Plan
Map-ReduceOptimizer
Map-Reduce Compiler
Logical Plan to Physical PlanLogical Plan to Physical Plan
Pig Latin to Physical Plan
A = LOAD ‘file1’ AS (x,y,z);B = LOAD ‘file2’ AS (t,u,v);
C = FILTER A by y > 0;D = JOIN C by x,B by u;E = GROUP D by z;
F = FOREACH E generate group, COUNT(D);
STORE F into ‘output’;
LOAD
FILTER
LOAD
JOIN
GROUP
FOREACH
STORE
x,y,z x,y,z,t,u,v
group , count
Logical Plan to Physical Plan
LOAD
FILTER
LOAD
JOIN
GROUP
FOREACH
STORE
1
2
3
4
7
6
5
LOCAL REARRANGE GLOABL REARRANGE
GLOBAL REARRANGE
STORE
LOCAL REARRANGE
LOAD
FILTER
LOAD
PACKAGE
FOREACH
PACKAGE
FOREACH
1
2
3
4 44
4
5 55
6
7
Physical Plan to Map-Reduce Plan
LOCAL REARRANGE GLOABL REARRANGE
GLOBAL REARRANGE
STORE
LOCAL REARRANGE
LOAD
FILTER
LOAD
PACKAGE
FOREACH
PACKAGE
FOREACH
1
2
3
4 44
4
5 55
6
7
Filter
Local Rearrange
Local Rearrange
Package
Foreach
Package
Foreach
Implementation
cluster
Hadoop Map-Reduce
Hadoop Map-Reduce
PigPig
SQL
automaticrewrite +optimize
or
or
user
Pig is open-source.http://hadoop.apache.org/pig
Pig is open-source.http://hadoop.apache.org/pig
• ~50% of Hadoop jobs at Yahoo! are Pig• 1000s of jobs per day
Compilation into Map-Reduce
Load VisitsLoad Visits
Group by urlGroup by url
Foreach urlgenerate count
Foreach urlgenerate count Load Url InfoLoad Url Info
Join on urlJoin on url
Group by categoryGroup by category
Foreach categorygenerate top10(urls)
Foreach categorygenerate top10(urls)
Map1
Reduce1Map2
Reduce2
Map3
Reduce3
Every group or join operation forms a map-reduce boundary
Other operations pipelined into map and reduce phases
Nested Sub Plans
SPLIT
FILTERFILTER
FOEACH FOREACH
FILTER
LOCAL REARRANGE
GLOBAL REARRANGE
SPLIT
FOREACH FOREACH
PACKAGE PACKAGE
MULTIPLEX
Outline
• Map-Reduce and the need for Pig Latin
• Pig Latin
• Compilation into Map-Reduce
• Optimization
• Future Work
Using the Combiner
Inputrecords
k1 v1
k2 v2
k1 v3
k2 v4
k1 v5
mapmap
mapmap
k1 v1
k1 v3
k1 v5
k2 v2
k2 v4
Outputrecords
reducereduce
reducereduce
Can pre-process data on the map-side to reduce data shipped• Algebraic Aggregation Functions• Distinct processing
Skew Join
• Default join method is symmetric hash join.
group results revenue
LakersLakers nba.com 1 Lakers top 50
Lakers espn.com 2 Lakers side 20
KingsKings nhl.com 1 Kings top 30
Kings nba.com 2 Kings side 10
cross product carried out on 1 reducer
• Problem if too many values with same key• Skew join samples data to find frequent values• Further splits them among reducers
Fragment-Replicate Join
• Symmetric-hash join repartitions both inputs
• If size(data set 1) >> size(data set 2)– Just replicate data set 2 to all partitions of data set 1
• Translates to map-only job– Open data set 2 as “side file”
Merge Join
• Exploit data sets are already sorted.
• Again, a map-only job– Open other data set as “side file”
Multiple Data Flows [1]
Load UsersLoad Users
Filter botsFilter bots
Group by state
Group by state
Apply udfsApply udfs
Store into ‘bystate’
Store into ‘bystate’
Group by demographic
Group by demographic
Apply udfsApply udfs
Store into ‘bydemo’
Store into ‘bydemo’
Map1
Reduce1
Multiple Data Flows [2]
Load UsersLoad Users
Filter botsFilter bots
Group by state
Group by state
Apply udfsApply udfs
Store into ‘bystate’
Store into ‘bystate’
Group by demographic
Group by demographic
Apply udfsApply udfs
Store into ‘bydemo’
Store into ‘bydemo’
SplitSplit
DemultiplexDemultiplex
Map1
Reduce1
Performance
Strong & Weak Points
• Explicit Dataflow• Retains Properties of
Map-Reduce• Scalability• Fault Tolerance• Multi Way Processing• Open Source
• Column wise Storage structures are missing
• Memory Management• No facilitation for Non
Java Users• Limited Optimization• No GUI for Flow Graphs
+++
The step-by-step method of creating a program in Pig is much cleaner and simpler to use than the single block method of SQL. It is easier to keep track of what your variables are, and where you are in the process of analyzing your data.
The step-by-step method of creating a program in Pig is much cleaner and simpler to use than the single block method of SQL. It is easier to keep track of what your variables are, and where you are in the process of analyzing your data.
Jasmine NovakEngineer, Yahoo!
With the various interleaved clauses in SQLIt is difficult to know what is actually happening sequentially. With the various interleaved clauses in SQLIt is difficult to know what is actually happening sequentially.
David CiemiewiczSearch Excellence, Yahoo!
Hot Competitor
Google Map-Reduce System
Installation Process
1. Download Java Editor (NetBeans or Eclipse)2. Create sample pig latin code in any Java editor3. Install Pig Plugins (JavaCC, Subclipse)4. Add necessary jar files for Hadoop API in project5. Run input files in any Java editor using Hadoop API, NOTE: Your system must work as distributed cluster
Another NOTE: If you want to run sample as Command Line please install more softwares:
- JUNIT, Ant, CygWin- And set your path variables everywhere
New Systems For Data Analysis
Map-Reduce
Apache Hadoop
Dryad
Sawzall
Hive
. . .
Outline
• Map-Reduce and the need for Pig Latin
• Pig Latin
• Compilation into Map-Reduce
• Optimization
• Future Work
Future / In-Progress Tasks
• Columnar-storage layer• Non Java UDF and SQL Interface• Metadata repository• GUI Pig• Tight integration with a scripting language
– Use loops, conditionals, functions of host language
• Memory Management & Enhanced Optimization• Project Suggestions at:
http://wiki.apache.org/pig/ProposedProjects
Summary
• Big demand for parallel data processing– Emerging tools that do not look like SQL DBMS– Programmers like dataflow pipes over static files
• Hence the excitement about Map-Reduce
• But, Map-Reduce is too low-level and rigid
Pig LatinSweet spot between map-reduce and SQL
Pig LatinSweet spot between map-reduce and SQL