Pig Programming
Viraj Bhat ([email protected])Cloud Computing & Data Infrastructure Group, Yahoo! Inc.
University of Illinois at Urbana-Champaign
Feb 13,2009
- 2 -
About Me
• Part of the Yahoo! Grid Team since May 2008
• PhD from Rutgers University, NJ
– Specialization in Data Streaming, Grid, Autonomic Computing
• Worked on streaming data from live simulations executing in NERSC (CA), ORNL (TN) to Princeton Plasma Physics Lab (PPPL - NJ)
– Library introduce less then 5% overhead on computation
• PhD Thesis on In-Transit data processing for peta-scale simulation workflows
• Developed CorbaCoG kit for Globus
- 3 -
Agenda
• Introduction to Pig -- Session I - [11 am -12 pm]
– Overview
• Pig Latin overview
• Pig Examples
• Pig Architecture
• Pig Latin details
• Additional Topics
– Pig Streaming and Parameter substitution
– Translating SQL queries into Pig Latin
• Hands-on Pig -- Session II [1 pm - 4 pm]
– Running, debugging, optimization
- 4 -
Hadoop and Pig
Hadoop is great for distributed computation but..
1. Several simple, frequently-used operations
Filtering, projecting, grouping, aggregation, joining, sorting commonly used by users at Yahoo!
2. Users’ end goal typically requires several Map-Reduce jobs
Thus, making it hard to maintain, extend, debug, optimize
Hadoop is great for distributed computation but..
1. Several simple, frequently-used operations
Filtering, projecting, grouping, aggregation, joining, sorting commonly used by users at Yahoo!
2. Users’ end goal typically requires several Map-Reduce jobs
Thus, making it hard to maintain, extend, debug, optimize
- 5 -
What is PIG?
• SQL like language (PIG Latin) for processing large semi-structured data sets using Hadoop M/R– PIG is an Hadoop (Apache) sub-project
– ~30 % Grid users in Yahoo use PIG
• PIG Latin– PIG is procedural (data flow) language
• Allows users to specify explicit sequence of steps for data processing
• Think of it as: Scripting harness to chain together multiple map-reduce jobs
– Supports relational model• But NOT very strongly typed
• Supports primitive relational operators like, FILTER, PROJECTIONS, JOIN, GROUPING etc.
- 6 -
Advantages of using Pig
• Pig can be treated as a higher level language
– Increases programmer productivity
– Decreases duplication of effort
– Opens the M/R programming system to more users
• Pig insulates against hadoop complexity
– Hadoop version upgrades
– JobConf configuration tuning
- 7 -
Pig making Hadoop Easy
• How to find the top 100 most visited sites by users aged 18 to 25??
• Using Native Hadoop Java code requires 20x more lines of code vs. Pig script
• Native Hadoop Java code requires 16x more development time than Pig
• Native Hadoop Java code takes 11 min to run while Pig requires 20 min
import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapred.lib.IdentityMapper; public class MRExample { public static class LoadPages extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable k, Text val, OutputCollector<Text, Text> oc, Reporter reporter) throws IOException { // Pull the key out String line = val.toString(); int firstComma = line.indexOf(','); String key = line.substring(0, firstComma); String value = line.substring(firstComma + 1); Text outKey = new Text(key); // Prepend an index to the value so we know which file // it came from. Text outVal = new Text("1" + value); oc.collect(outKey, outVal); } } public static class LoadAndFilterUsers extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable k, Text val, OutputCollector<Text, Text> oc, Reporter reporter) throws IOException { // Pull the key out String line = val.toString(); int firstComma = line.indexOf(','); String value = line.substring(firstComma + 1); int age = Integer.parseInt(value); if (age < 18 || age > 25) return; String key = line.substring(0, firstComma); Text outKey = new Text(key); // Prepend an index to the value so we know which file // it came from. Text outVal = new Text("2" + value); oc.collect(outKey, outVal); } } public static class Join extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> iter, OutputCollector<Text, Text> oc, Reporter reporter) throws IOException { // For each value, figure out which file it's from and store it // accordingly. List<String> first = new ArrayList<String>(); List<String> second = new ArrayList<String>(); while (iter.hasNext()) { Text t = iter.next(); String value = t.toString();
if (value.charAt(0) == '1') first.add(value.substring(1)); else second.add(value.substring(1)); reporter.setStatus("OK"); } // Do the cross product and collect the values for (String s1 : first) { for (String s2 : second) { String outval = key + "," + s1 + "," + s2; oc.collect(null, new Text(outval)); reporter.setStatus("OK"); } } } } public static class LoadJoined extends MapReduceBase implements Mapper<Text, Text, Text, LongWritable> { public void map( Text k, Text val, OutputCollector<Text, LongWritable> oc, Reporter reporter) throws IOException { // Find the url String line = val.toString(); int firstComma = line.indexOf(','); int secondComma = line.indexOf(',', firstComma); String key = line.substring(firstComma, secondComma); // drop the rest of the record, I don't need it anymore, // just pass a 1 for the combiner/reducer to sum instead. Text outKey = new Text(key); oc.collect(outKey, new LongWritable(1L)); } } public static class ReduceUrls extends MapReduceBase implements Reducer<Text, LongWritable, WritableComparable, Writable> { public void reduce( Text key, Iterator<LongWritable> iter, OutputCollector<WritableComparable, Writable> oc, Reporter reporter) throws IOException { // Add up all the values we see long sum = 0; while (iter.hasNext()) { sum += iter.next().get(); reporter.setStatus("OK"); } oc.collect(key, new LongWritable(sum)); } } public static class LoadClicks extends MapReduceBase implements Mapper<WritableComparable, Writable, LongWritable, Text> { public void map( WritableComparable key, Writable val, OutputCollector<LongWritable, Text> oc, Reporter reporter) throws IOException { oc.collect((LongWritable)val, (Text)key); } } public static class LimitClicks extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> { int count = 0; public void reduce( LongWritable key, Iterator<Text> iter, OutputCollector<LongWritable, Text> oc, Reporter reporter) throws IOException { // Only output the first 100 records while (count < 100 && iter.hasNext()) { oc.collect(key, iter.next()); count++; } }
} public static void main(String[] args) throws IOException { JobConf lp = new JobConf(MRExample.class); lp.setJobName("Load Pages"); lp.setInputFormat(TextInputFormat.class); lp.setOutputKeyClass(Text.class); lp.setOutputValueClass(Text.class); lp.setMapperClass(LoadPages.class); FileInputFormat.addInputPath(lp, new Path("/user/gates/pages")); FileOutputFormat.setOutputPath(lp, new Path("/user/gates/tmp/indexed_pages")); lp.setNumReduceTasks(0); Job loadPages = new Job(lp); JobConf lfu = new JobConf(MRExample.class); lfu.setJobName("Load and Filter Users"); lfu.setInputFormat(TextInputFormat.class); lfu.setOutputKeyClass(Text.class); lfu.setOutputValueClass(Text.class); lfu.setMapperClass(LoadAndFilterUsers.class); FileInputFormat.addInputPath(lfu, new Path("/user/gates/users")); FileOutputFormat.setOutputPath(lfu, new Path("/user/gates/tmp/filtered_users")); lfu.setNumReduceTasks(0); Job loadUsers = new Job(lfu); JobConf join = new JobConf(MRExample.class); join.setJobName("Join Users and Pages"); join.setInputFormat(KeyValueTextInputFormat.class); join.setOutputKeyClass(Text.class); join.setOutputValueClass(Text.class); join.setMapperClass(IdentityMapper.class); join.setReducerClass(Join.class); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/indexed_pages")); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/filtered_users")); FileOutputFormat.setOutputPath(join, new Path("/user/gates/tmp/joined")); join.setNumReduceTasks(50); Job joinJob = new Job(join); joinJob.addDependingJob(loadPages); joinJob.addDependingJob(loadUsers); JobConf group = new JobConf(MRExample.class); group.setJobName("Group URLs"); group.setInputFormat(KeyValueTextInputFormat.class); group.setOutputKeyClass(Text.class); group.setOutputValueClass(LongWritable.class); group.setOutputFormat(SequenceFileOutputFormat.class); group.setMapperClass(LoadJoined.class); group.setCombinerClass(ReduceUrls.class); group.setReducerClass(ReduceUrls.class); FileInputFormat.addInputPath(group, new Path("/user/gates/tmp/joined")); FileOutputFormat.setOutputPath(group, new Path("/user/gates/tmp/grouped")); group.setNumReduceTasks(50); Job groupJob = new Job(group); groupJob.addDependingJob(joinJob); JobConf top100 = new JobConf(MRExample.class); top100.setJobName("Top 100 sites"); top100.setInputFormat(SequenceFileInputFormat.class); top100.setOutputKeyClass(LongWritable.class); top100.setOutputValueClass(Text.class); top100.setOutputFormat(SequenceFileOutputFormat.class); top100.setMapperClass(LoadClicks.class); top100.setCombinerClass(LimitClicks.class); top100.setReducerClass(LimitClicks.class); FileInputFormat.addInputPath(top100, new Path("/user/gates/tmp/grouped")); FileOutputFormat.setOutputPath(top100, new Path("/user/gates/top100sitesforusers18to25")); top100.setNumReduceTasks(1); Job limit = new Job(top100); limit.addDependingJob(groupJob); JobControl jc = new JobControl("Find top 100 sites for users 18 to 25"); jc.addJob(loadPages);
Users = LOAD ‘users’ AS (name, age);
Fltrd = FILTER Users by age >= 18 and age <= 25;
Pages = LOAD ‘pages’ AS (user, url);
Jnd = JOIN Fltrd BY name, Pages BY user;
Grpd = GROUP Jnd by url;
Smmd = FOREACH Grpd GENERATE group, COUNT(Jnd) AS clicks;
Srtd = ORDER Smmd BY clicks;
Top100 = LIMIT Srtd 100;
STORE Top100 INTO ‘top100sites’;
Solution in Native Hadoop M/R program
Solution using Pig Script
Pig LatinPig Latin
- 9 -
Introduction to Pig Latin
• Pig Latin is a dataflow language through which users can write programs in Pig
• Pig Latin consists of the following datatypes– Data Atom
• Simple atomic data value eg. ‘alice’ or ‘1.0’
– Tuple• Data record consisting of a sequence of “fields” (similar to record
in SQL) e.g. (alice,lakers) or (alice,kings)
– Data Bag• Set of Tuples equivalent to a “table” in SQL terminology e.g.
{(alice,lakers), (alice,kings)}
– Data Map• A set of key value pairs e.g. [likes#(lakers,kings),age#22]
- 10 -
Pig Latin: Data Types
TypeType ExampleExample
1. Data Atoman atomic value
1. Data Atoman atomic value ‘alice’‘alice’
2. Tuplea record
2. Tuplea record
(alice,lakers)(alice,lakers)
3. Data Bagcollection supporting
scan
3. Data Bagcollection supporting
scan
4. Data Mapcollection with key
lookup
4. Data Mapcollection with key
lookup
(alice,lakers)(alice,kings)
(alice,lakers)(alice,kings)
‘likes’#[
(lakers),(kings)
‘age’#22 ]
- 11 -
Pig Types
• Pig supports the following types in addition to map, bag and tuples– int: 32 bit integer, signed type
– long: 64 bit for unsigned type• (alice,25,6L)
– float: 32 bit floating point• (alice,25, 5.6f),(alice,25,1.92e2f)
– double: 64 bit floating point• (alice,22,1.92),(alice,22,1.92e2)
– chararray: set of characters stored in Unicode• (‘alice’, ‘bob’)
– bytearray: array of bytes• default type if you do not specify the type of the field
- 12 -
Expressions
( 1, (2,3),(4,6)
, [‘yahoo’#‘mail’]
)Sample tuple of a
a (f1:int , f2:bag{t:tuple (n1:int, n2:int)}, f3: map[] )
(2,3),(4,6)
f1or $0
f2 or $1 f3 or $2
f2 =
(2,3),(4,6)
$1 =$0 = 1
Field referred to by position
Field referred to by name
Projection of a data item
f2.$0 =
(2),(4)
f3# ‘yahoo’ = ‘mail’
Map Lookup
SUM(f2.$0) = 6 (2+4)COUNT(f2) = 2L
Function Application
a = name of alias/relation which contains types int, tuple and map
note: bag, tuple keywords are optional
- 13 -
Conditions (Boolean Expressions)
==, !=, >, >=, <, <= for numerical comparison Pig: only f1==1 works as f1 is type integereq, neq, gt, gte, lt, lte for string comparisons f3#‘yahoo’ gt ‘ma’ deprecated, does not force operations to string
matches for regular expression matching f3#‘yahoo’ matches ‘(?i)MAIL’
==, !=, >, >=, <, <= for numerical comparison Pig: only f1==1 works as f1 is type integereq, neq, gt, gte, lt, lte for string comparisons f3#‘yahoo’ gt ‘ma’ deprecated, does not force operations to string
matches for regular expression matching f3#‘yahoo’ matches ‘(?i)MAIL’
( 1, (2,3),(4,6)
, [‘yahoo’#‘mail’]
)Sample tuple of a
f1or $0
f2 or $1 f3 or $2
a = name of alias which contains types int, tuple and map
a (f1:int , f2:bag{t:tuple (n1:int, n2:int)}, f3: map[] )
- 14 -
Pig Latin Operators and Conditional Expression
Logical Operators AND, OR, NOT f1==1 AND f3#‘yahoo’ eq ‘mail’
Conditional Expression (aka BinCond) (Condition?exp1:exp2) f3#‘yahoo’matches ‘(?i)MAIL’ ? ‘matched’: ‘notmatched’
Logical Operators AND, OR, NOT f1==1 AND f3#‘yahoo’ eq ‘mail’
Conditional Expression (aka BinCond) (Condition?exp1:exp2) f3#‘yahoo’matches ‘(?i)MAIL’ ? ‘matched’: ‘notmatched’
( 1, (2,3),(4,6)
, [‘yahoo’#‘mail’]
)Sample tuple of a
f1or $0
f2 or $1 f3 or $2
a = name of alias which contains types int, tuple and map
a (f1:int , f2:bag{t:tuple (n1:int, n2:int)}, f3: map[] )
Pig Latin Language by Example
Pig Latin Language by Example
- 16 -
Examples
1. Word Count
2. Computing average number of page visits by user
3. Identify users who visit highly ranked or Good pages
4. Namenode Audit Log processing via UDF’s in Chukwa (User Defined Functions)
- 17 -
Word count using Pig
program program
pig pig
program pig
hadoop pig
latin latin
latin latin
wc.txt
• Count words in a text file separated by lines and spaces• Basic idea:
– Load this file using a loader– Foreach record generate word token– Group by each word– Count number of words in a group– Store to file
- 18 -
Word Count using Pig
myinput = load '/user/viraj/wc.txt' USING TextLoader() as (myword:chararray);
words = FOREACH myinput GENERATE FLATTEN(TOKENIZE(*));
grouped = GROUP words BY $0;
counts = FOREACH grouped GENERATE group, COUNT(words);
store counts into ‘/user/viraj/pigoutput’ using PigStorage();
Write to HDFS /user/viraj/pigoutput/part-* file
- 19 -
Computing average number of page visits by user
user url time
Amy www.cnn.com 8:00
Amy www.crap.com 8:05
Amy www.myblog.com 10:00
Amy www.flickr.com 10:05
Fred
Fred
cnn.com/index.htm
cnn.com/index.htm
12:00
1:00
Visits log
. . .
• Logs of user visiting a webpage consists of (user,url,time)
• Fields of the log are tab separated and in text format
• Basic idea:
– Load the log file
– Group based on the user field
– Count the group
– Calculate average for all user
- 20 -
How to program this in Pig Latin
VISITS = load ‘/user/viraj/visits' as (user, url, time);
USER_VISITS = group VISITS by user;
USER_CNTS = foreach USER_VISITS generate group as user, COUNT(VISITS) as numvisits;
ALL_CNTS = group USER_COUNTS all;
AVG_CNT = foreach ALL_CNTS generate AVG(USER_COUNTS.numvisits);
dump visits;
- 21 -
Identify users who visit “Good Pages”
url pagerank
www.cnn.com 0.9
www.flickr.com 0.9
www.myblog.com 0.7
www.crap.com 0.2
Pages log
. . .user url time
Amy www.cnn.com 8:00
Amy www.crap.com 8:05
Amy www.myblog.com 10:00
Amy www.flickr.com 10:05
Fred
Fred
cnn.com/index.htm
cnn.com/index.htm
12:00
1:00. . .
Visits log
• Good pages are those pages visited by users whose page rank is greater than 0.5
• Basic idea
– Join tables based on url
– Group based on user
– Calculate average pagerank of user visited pages
– Filter user who has average pagerank greater than 0.5
– Store the result
Good page
- 22 -
How to Program this in Pig
VISITS = load ‘/user/viraj/visits.txt' as (user:chararray, url:chararray, time:chararray);
PAGES = load '/user/viraj/pagerank.txt' as (url:chararray, pagerank:float);
VISITS_PAGES = join VISITS by url, PAGES by url;
Load files for processing with appropriate types
Join them on url fields of table
- 23 -
How to Program this in Pig
USER_VISITS = group VISITS_PAGES by user;
USER_AVGPR = foreach USER_VISITS generate group, AVG(VISITS_PAGES.pagerank) as avgpr;
GOOD_USERS = filter USER_AVGPR by avgpr > 0.5f;
store GOOD_USERS into '/user/viraj/goodusers';
Group by user, in our case Amy, Fred
Generate an average pagerank per user
Filter records based on pagerank
Store results in hdfs
- 24 -
Namenode Audit log processing
(1232063871538L, [capp#/hadoop/log/audit.log,ctags#cluster=“cluster2", body#2009-01-15 23:57:51,538 INFO org.apache.hadoop.fs.FSNamesystem.audit: ugi=users,hdfs<tab> ip=/11.11.11.11<tab>cmd=mkdirs src=/user/viraj/output/part-0000<tab>dst=null<tab> perm=users:rwx------ ,csource#[email protected]])
(1232063879123L, [capp#/hadoop/log/audit.log,ctags#cluster=“cluster2",body#2009-01-15 23:57:59,123 INFO org.apache.hadoop.fs.FSNamesystem.audit: ugi=users,hdfs ip=/11.12.11.12 <tab>cmd=rename<tab>src=/user/viraj/output/part-0001<tab> dst=/user/viraj/output/part-0002<tab>perm=users:rw-------, csource#[email protected]])
audit.log
• Count number of HDFS operations that took place
• Basic idea:– Load this file using a custom loader known as ChukwaStorage into a
long column and a list of map fields
– Extract the value of the body key as it contains the cmd the namenode executed
– Cut the body tag on tabs to extract only the cmd value using a UDF
– Group by cmd value and Count it
- 25 -
How to Program this in Pig
• register chukwa-core.jar
• register chukwaexample.jar
• A = load '/user/viraj/Audit.log' using chukwaexample.ChukwaStorage() as (ts: long,fields)
• B = FOREACH A GENERATE fields#'body' as log;
• C = FOREACH B GENERATE FLATTEN(chukwaexample.Cut(log)) as (timestamp, ip, cmd, src, dst, perm);
Register the jars which contain your udf’s
Jar internally used by ChukwaStorage
Contains both ChukwaStorage and Cut classes
Project the right value for key body
Cut the body based on tabs to extract “cmd”
- 26 -
How to Program this in Pig
• D = FOREACH C GENERATE cmd as cmd:chararray;
• E = group D by cmd;
• F = FOREACH E GENERATE group, COUNT(D) as count;
• dump F;
Project the right column for use in group
Group by command type
Count the commands and dump
Pig ArchitecturePig Architecture
- 28 -
( SQL )
Pig
Map-Reduce
cluster
automaticrewrite +optimize
or
or
user
Logical Plan
Physical Plan
M/R Plan
A high-level language to express computation and execute it over
hadoop
Pig Architecture - Map-Reduce as Backend
- 29 -
• Parsing
• Semantic checking
• Planning
• Runs on user machine or gateway
• Script execution
• Runs on grid
Pig Architecture
Front End Back End
- 30 -
Query Parser
Pig Latin
Logical Plan
Semantic Checking Logical Plan
Logical Optimizer Optimized Logical Plan
Logical to Physical Translator Physical Plan
Physical To M/R Translator MapReduce Plan
Map Reduce Launcher
Create a job jar to be submitted to hadoop cluster
Front End Architecture
Pig Latin Programs
- 31 -
Logical Plan
• Consists of DAG of Logical Operators as nodes and Data Flow represented as edges
– Logical Operators contain list of i/p’s o/p’s and schema
• Logical operators
– Aid in post parse stage checking (type checking)
– Optimization
– Translation to Physical Plan Proj(0)Load
Filter
Store
>
Const(5)
a = load ‘myfile’;b = filter a by $0 > 5;store b into ‘myfilteredfile’;
- 32 -
Physical Plan
• Layer to map Logical Plan to multiple back-ends, one such being M/R (Map Reduce)
– Chance for code re-use if multiple back-ends share same operator
• Consists of operators which pig will run on the backend
• Currently most of the physical plan is placed as operators in the map reduce plan
• Logical to Physical Translation
– 1:1 correspondence for most Logical operators
– Except Cross, Distinct, Group, Co-group and Order
- 33 -
Logical to Physical Plan for Group operator
• Logical operator for co-group/group is converted to 3 Physical operators
– Local Rearrange (LR)
– Global Rearrange (GR)
– Package (PKG)
• Example:
– group A by Acol1, B by Bcol1
PKG
GR
LR LR
A B
(1,R)
(2,G)
{1,(1,R)}1
{2,(2,G)}1
(1,B)
(2,Y)
Tuples
{Key,(Value)}(table no)
{1,{(1,R)1, (1,B)2}}
{2,{(2,Y)2, (2,G)2}}
{1,(1,B)}2
{2,(2,Y)}2
{Key,{ListofValues}}
{1,{(1,R)1}, {(1,B)2}}
{2,{(2,G)1}, {(2,Y)2}}
Acol1 Bcol1
- 34 -
Map Reduce Plan
• Physical to Map Reduce (M/R) Plan conversion happens through the MRCompiler
– Converts a physical plan into a DAG of M/R operators
• Boundaries for M/R include cogroup/group, distinct, cross, order by, limit (in some cases)
– Push all subsequent operators between cogroup to next cogroup into reduce
– order by is implemented as 2 M/R jobs
• JobControlCompiler then uses the M/R plan to construct a JobControl object
load filter group cogroup
map1 reduce1
C1 Ci
cogroup
Ci+1
mapi reducei
- 35 -
Pig operators are placed in a pipeline and each record is then pulled through.
A = load …B = foreach …C = filter …D = group ……
Map phase will have for each
& filter For each
Filter
Prep for reduce
record from hadoop
collect
Back End Architecture
- 36 -
Lazy Execution
• Nothing really executes until you request output
• Only when the store, dump, explain, describe, illustrate command is encountered does the M/R execution takes place either on your cluster or front end
• Advantages
– In-memory pipelining
– Filter re-ordering across multiple commands
Commands for manipulating data setsCommands for manipulating data sets
- 38 -
Loading Data using PigStorage
query = load ‘/user/viraj/query.txt’ using PigStorage() as (userId, queryString, timestamp)
queries = load ‘/user/viraj/querydir/part-*’ using PigStorage() as (userId, queryString, timestamp)
queries = load ‘query_log.txt’ using PigStorage(‘\u0001’) as (userId, queryString, timestamp)
query = load ‘/user/viraj/query.txt’ using PigStorage() as (userId, queryString, timestamp)
queries = load ‘/user/viraj/querydir/part-*’ using PigStorage() as (userId, queryString, timestamp)
queries = load ‘query_log.txt’ using PigStorage(‘\u0001’) as (userId, queryString, timestamp)
All files under querydir containing part-* are loaded Can be a file Can be a path with globbing userID, queryString, timestamp fields should to be tab separated PigStorage(‘{delimiter}’)
loads records with fields delimited by {delimiter} unicode representation ‘\u0001’ for Ctrl+A, ‘\u007C’ for pipe character: |
- 39 -
Storing data using PigStorage and viewing Datastore joined_result into ‘/user/viraj/output’ ;store joined_result into ‘/user/viraj/myoutput/’ using PigStorage(‘\u0001’);store joined_result into ‘/user/viraj/myoutputbin’ using BinStorage();store sports_views into ‘/user/viraj/myoutput’ using PigDump();
dump sports_views;
store joined_result into ‘/user/viraj/output’ ;store joined_result into ‘/user/viraj/myoutput/’ using PigStorage(‘\u0001’);store joined_result into ‘/user/viraj/myoutputbin’ using BinStorage();store sports_views into ‘/user/viraj/myoutput’ using PigDump();
dump sports_views; • Default PigStorage if you do not specify anything
– Result stored as ASCII text in files part-* under output dir
• Similar to load can use PigStorage (‘{delimiter}’) – Stores records with fields delimited by unicode {delimiter}
• Default is tab i.e. you don’t specify anything
• Store as Ctrl+A separated by specifying PigStorage(‘\u0001’)
• BinStorage store arbitrarily nested data. – Used for loading intermediate results that were previously stored using
it.
• Dump displays data on terminal– Use PigDump storage class internally
(alice,lakers,3L)(alice,lakers, 0.7f)
- 40 -
Filtering Data
queries = load ‘query_log.txt’ using PigStorage(‘\u0001’) as (userId: chararray, queryString: chararray, timestamp: int);
filtered_queries = filter queries by timestamp > 1;
filtered_queries_pig20 = filter queries by timestamp is not null;
store filtered_queries into ‘myoutput’ using PigStorage();
queries = load ‘query_log.txt’ using PigStorage(‘\u0001’) as (userId: chararray, queryString: chararray, timestamp: int);
filtered_queries = filter queries by timestamp > 1;
filtered_queries_pig20 = filter queries by timestamp is not null;
store filtered_queries into ‘myoutput’ using PigStorage();
(alice, iPod, 3)(alice, lakers, 4)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
queries: (userId, queryString, timestamp)
filtered_queries: (userId, queryString, timestamp)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
filtered_queries_pig20: (userId, queryString, timestamp)
- 41 -
(Co)Grouping Data
queries = load query.txt as ()..;sport_views = load sports_views.txt as ….;
team matches = cogroup sports_views by (userId, team), queries by (userId, queryString);
queries = load query.txt as ()..;sport_views = load sports_views.txt as ….;
team matches = cogroup sports_views by (userId, team), queries by (userId, queryString);
Data 1:queries: (userId: charrarray, queryString : chararray, timestamp: int)Data 2:sports_views: (userId: chararray, team: chararray, timestamp: int)
- 42 -
Example of Cogrouping
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
queries: (userId, queryString, timestamp)
- 43 -
Example of Cogrouping
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
team_matches: (group, sports_views, queries)
queries: (userId, queryString, timestamp)
cogroup
team matches = cogroup sports_views by (userId, team),
queries by (userId, queryString);
team matches = cogroup sports_views by (userId, team),
queries by (userId, queryString);
- 44 -
Example of Cogrouping
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
team_matches: (group, sports_views, queries)
queries: (userId, queryString, timestamp)
cogroup
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
- 45 -
Example of Cogrouping
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
team_matches: (group, sports_views, queries)
queries: (userId, queryString, timestamp)
cogroup
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
- 46 -
Cogrouping
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
team_matches: (group, sports_views, queries)
queries: (userId, queryString, timestamp)
cogroup
Can operate on grouped data just as on base data using foreach
Can operate on grouped data just as on base data using foreach
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
- 47 -
Filtering records of Grouped Data
filtered_matches = filter team_matches by COUNT(sports_views) > ‘0’;
filtered_matches = filter team_matches by COUNT(sports_views) > ‘0’;
team_matches: (group, sports_views, queries)
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
- 48 -
Per-Record Transformations using foreach
times_and_counts = foreach team_matches generate group,
COUNT(sports_views), MAX(queries.timestamp);
times_and_counts = foreach team_matches generate group,
COUNT(sports_views), MAX(queries.timestamp);
team_matches: (group, sports_views, queries)
times_and_counts: (group, -, -)
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
( (alice, iPod), 0, 3 )((alice, lakers), 2,
4)
- 49 -
Giving Names to Output Fields in FOREACH
times_and_counts = foreach team_matches generate group,
COUNT(sports_views) as count, MAX(queries.timestamp) as max;
times_and_counts = foreach team_matches generate group,
COUNT(sports_views) as count, MAX(queries.timestamp) as max;
( (alice, iPod), 0, 3 )((alice, lakers), 2,
4)
times_and_counts: (group, count, max)
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
- 50 -
Eliminating Nesting: FLATTEN
expanded_queries1 = foreach queries generate userId, expandQuery(queryString);
expanded_queries2 = foreach queries generate userId,
flatten(expandQuery(queryString));
expanded_queries1 = foreach queries generate userId, expandQuery(queryString);
expanded_queries2 = foreach queries generate userId,
flatten(expandQuery(queryString));
(alice, lakers, 1)(bob, iPod, 3)
queries: (userId, queryString,
timestamp)
( )
(lakers rumors)(lakers news)
(iPod nano)(iPod shuffle)
( )alice,
bob,
(alice, lakers rumors)(alice, lakers news)
(bob, iPod nano)(bob, iPod shuffle)
(alice, lakers, 1)(bob, iPod, 3)
queries: (userId, queryString,
timestamp)
expanded_queries1
expanded_queries2
flatten(expandQuery(queryString))
- 51 -
Flattening Multiple Items resulting from cogroupteam matches = cogroup sports_views by (userId, team),
queries by (userId, queryString);
joined_result = foreach team_matches generate flatten(sports_views),
flatten(queries);
team matches = cogroup sports_views by (userId, team), queries by (userId, queryString);
joined_result = foreach team_matches generate flatten(sports_views),
flatten(queries);
(alice, lakers, 3, alice, lakers, 1)(alice, lakers, 3, alice, lakers, 4)(alice, lakers, 7, alice, lakers, 1)(alice, lakers, 7, alice, lakers, 4)
team_matches: (group, sports_views, queries)
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
queries: (userId, queryString, timestamp)
joined_result
cogroup
flatten
- 52 -
Syntactic shortcut: JOIN = COGROUP + FLATTEN
joined_result = join sports_views by (userId, team), queries by (userId, queryString);
joined_result = join sports_views by (userId, team), queries by (userId, queryString);
(alice, lakers, 3, alice, lakers, 1)(alice, lakers, 3, alice, lakers, 4)(alice, lakers, 7, alice, lakers, 1)(alice, lakers, 7, alice, lakers, 4)(alice, lakers, 3)
(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
queries: (userId, queryString, timestamp)
join
- 53 -
Eliminating duplicates using distinct
queries = load 'queries.txt' as (userId, queryString: chararray, timestamp);
query_strings = foreach queries generate queryString as qString;
distinct_queries = distinct query_strings as dString;
queries = load 'queries.txt' as (userId, queryString: chararray, timestamp);
query_strings = foreach queries generate queryString as qString;
distinct_queries = distinct query_strings as dString;
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
(iPod)(lakers)
distinct_queries:(dString)
Duplicate detection done as bytearrays if type is not specifiedDuplicate detection done as bytearrays if type is not specified
queries: (userId, queryString, timestamp)
(lakers)(iPod)
(lakers)
query_strings:(qString)
- 54 -
Ordering Data using Order Byqueries = load 'queries.txt' as (userId: chararray, queryString: chararray, timestamp: int); -- Pig 2.0
ordered_timestamp_desc = order queries by timestamp desc;ordered_qs_desc = order queries by queryString desc;
ordered_qsts_asc = order queries by queryString, timestamp parallel 7;
queries = load 'queries.txt' as (userId: chararray, queryString: chararray, timestamp: int); -- Pig 2.0
ordered_timestamp_desc = order queries by timestamp desc;ordered_qs_desc = order queries by queryString desc;
ordered_qsts_asc = order queries by queryString, timestamp parallel 7;
(alice,lakers,4)(alice,ipod,3)
(alice,lakers,1)ordered_timestamp_desc:
Parallel use 7 reducers for this operation7 part files will be generated
Pig supports both desc ordering, default is ascending
Parallel use 7 reducers for this operation7 part files will be generated
Pig supports both desc ordering, default is ascending
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
queries: (userId, queryString, timestamp)
(alice,lakers,1)(alice,lakers,4)(alice,ipod,3)
ordered_qs_desc:
(alice,ipod,3)(alice,lakers,1)(alice,lakers,4)
ordered_qsts_asc:
- 55 -
Nested operations
team matches = cogroup sports_views by (userId, team), queries by (userId, queryString);
answers = foreach team_matches { distinct_qs = DISTINCT queries; generate flatten(group) as mygroup, COUNT(distinct_qs) as mycount; }
team matches = cogroup sports_views by (userId, team), queries by (userId, queryString);
answers = foreach team_matches { distinct_qs = DISTINCT queries; generate flatten(group) as mygroup, COUNT(distinct_qs) as mycount; }
Counts the number of distinct queries in each
group
Counts the number of distinct queries in each
group
team_matches: (group, sports_views, queries)
( (alice, iPod), { },{ (alice, iPod, 3) })
(alice, lakers), (alice, lakers, 1)(alice, lakers, 4)
(alice, lakers, 3)(alice, lakers, 7)( ),
(alice, lakers, 3)(alice, lakers, 7)
(alice, lakers, 1)(alice, iPod, 3)
(alice, lakers, 4)
sports_views:(userId, team, timestamp)
queries: (userId, queryString, timestamp)
cogroup
(alice,ipod,1L)(alice,lakers,2L)
answers:(mygroup,mycount)
flatten(group), count()
- 56 -
New Features
- 57 -
Fragment Replicate Join
• Use Fragment Replicate Join If you have a small (< 100M) table,
– J = join big by $0, small by $0 using “replicated”;
• Distribute the processing of the huge files by fragmenting it and replicating the small file to all machines which has a fragment of the huge file
– Written as Pig UDF
– http://wiki.apache.org/pig/PigFRJoin
- 58 -
New features in the Pipeline
• Pig Pen, eclipse plug-in for developing and testing in Pig Latin
– Currently available for use in M/R mode
• Performance
– Expanding cases where combiner is used
– Map side join
– Improving order by
• Sampler
• Reducing number of M/R jobs from 3 to 2
– Efficient multi-store queries
• Better error handling
- 59 -
Pig Resources
• Pig is a Apache subproject project
• Documentation– General info is at: http://wiki.apache.org/pig/
– Pig UDF : http://wiki.apache.org/pig/UDFManual
• Mailing lists– [email protected]
• Source code– https://svn.apache.org/repos/asf/hadoop/pig/trunk
• Code submissions – https://issues.apache.org/jira/browse/PIG-*
Additional MaterialAdditional Material
- 61 -
Pig Streaming
• Entire portion of the dataset that corresponds to an alias in sent to the external task and output streams out
– Use of the stream keyword
• Example using Python and Unix commands in Pig to process recordsgrunt> cat data/test10.txt;
Apple is the best fruit. 80
Plum is the best fruit. 20
grunt> a = load 'data/test10.txt';
grunt> b = stream a through `python -c "import sys; print sys.stdin.read().replace('Apple','Orange');"`;
grunt> dump b;
(Orange is the best fruit., 80)
(Plum is the best fruit., 20)
()
grunt> c = stream a through `cut -f2,2`;
grunt> dump c;
(80)
(20)
- 62 -
• Invoke a Perl, Python script from Pig and execute it
py_test.py:
#!/usr/local/bin/python
import sys; print sys.stdin.read().replace('Apple','Orange');
$ chmod 700 py_test.py
grunt> d = stream a through `py_test.py`;
Pig Streaming using Perl/Python programs
- 63 -
• Provides ability to have parameters within a pig script and provide values for this parameters at run time.
• Provides parameter values in a file or command line• Generates parameter values at run time by running a binary or a script
$ pig -param date=20080202
$ pig -param_file my_param_file
grunt> %default date '20080101'
grunt> A = load '/data/mydata/$date';
Using declare:
grunt>%declare date '20080101'
grunt> %declare CMD `generate_date_script`
-- This runs a script called generate_date_script
Parameter Substitution in Pig
Translating SQL queries into
PigLatin
Translating SQL queries into
PigLatin
- 65 -
Constructs
SQL Pig Example
From
table
Load
file(s)
SQL: from X;
Pig: A = load ‘mydata’ using PigStorage(‘\t’)
as (col1, col2, col3);
Select Foreach … generate
SQL: select col1 + col2, col3 …
Pig: B = foreach A generate col1 + col2, col3;
Where Filter SQL: select col1 + col2, col3
from X
where col2>2;
Pig: C = filter B by col2 > ‘2’;
- 66 -
SQL Pig Example
Group by
Group +
foreach … generate
SQL: select col1, col2, sum(col3)
from X group by col1, col2;
Pig: D = group A by (col1, col2);
E = foreach D generate flatten(group), SUM(A.col3);
Having Filter SQL: select col1, sum(col2) from X group by col1
having sum(col2) > 5;
Pig: F = filter E by $1 > ‘5’;
Order By
Order … By
SQL: select col1, sum(col2)
from X group by col1 order by col1;
Pig: H = ORDER E by $0;
Constructs
- 67 -
SQL Pig Example
Distinct Distinct SQL: select distinct col1 from X;
Pig: I = foreach A generate col1;
J = distinct I;
Distinct Agg
Distinct in foreach
SQL: select col1, count (distinct col2)
from X group by col1;
Pig: K = foreach D {
L = distinct A.col2;
generate flatten(group), SUM(L); }
Constructs
- 68 -
SQL Pig ExampleJoin Cogroup +
flatten
(also shortcut:
JOIN)
SQL: select A.col1, B.col3
from A join B using (col1);
Pig:
A = load ‘data1’ using PigStorage(‘\t’) as (col1, col2);
B = load ‘data2’ using PigStorage(‘\t’) as (col1, col3);
C = cogroup A by col1 inner, B by col1 inner;
D = foreach C generate flatten(A), flatten(B);
E = foreach D generate A.col1, B.col3;
Constructs
- 69 -