dataiku pig - hive - cascading

60
Pig Hive Cascading Hadoop In Practice Devoxx 2013 Florian Douetteau

Upload: dataiku

Post on 10-May-2015

2.558 views

Category:

Documents


10 download

TRANSCRIPT

Page 1: Dataiku   pig - hive - cascading

Pig Hive Cascading Hadoop In Practice

}  Devoxx 2013 }  Florian Douetteau

Page 2: Dataiku   pig - hive - cascading

Florian Douetteau <[email protected]>

}  CEO at Dataiku }  Freelance at Criteo (Online Ads) }  CTO at IsCool Ent. (#1 French Social Gamer) }  VP R&D Exalead (Search Engine Technology)

About me

4/14/13 Dataiku Training – Hadoop for Data Science 2

Page 3: Dataiku   pig - hive - cascading

}  Hadoop and Context (->0:03) }  Pig, Hive, Cascading, … (->0:06) }  How they work (->0:09) }  Comparing the tools (->0:25) }  Wrap’up and question (->0:)

Agenda

Dataiku - Pig, Hive and Cascading

Page 4: Dataiku   pig - hive - cascading

CHOOSE TECHNOLOGY

Dataiku - Pig, Hive and Cascading

Hadoop Ceph

Sphere Cassandra Spark

Scikit-Learn

Mahout WEKA

MLBase LibSVM

SAS RapidMiner

SPSS Panda

QlickView Tableau SpotFire HTML5/D3

InfiniDB Vertica

GreenPlum Impala Netezza

Elastic Search SOLR

MongoDB Riak

Membase

Pig Cascading

Talend

Machine Learning !Mystery Land!

Scalability Central!NoSQL-Slavia!

SQL Colunnar Republic!

Vizualization County! Data Clean Wasteland!

Statistician Old !House!

R

Page 5: Dataiku   pig - hive - cascading

How do I (pre)process data? Implicit User Data (Views, Searches…)

Content Data (Title, Categories, Price, …)

Dataiku - Pig, Hive and Cascading

Explicit User Data (Click, Buy, …)

User Information (Location, Graph…)

500TB

50TB

1TB

200GB

Transformation Matrix

Transformation Predictor

Per User Stats

Per Content Stats

User Similarity

Rank Predictor

Content Similarity

A/B Test Data

Predictor Runtime

Online User Information

Page 6: Dataiku   pig - hive - cascading

}  Analyse Raw Logs (Trackers, Web Logs)

}  Extract IP, Page, … }  Detect and remove

robots }  Build Statistics ◦  Number of page view, per

produt ◦  Best Referers ◦  Traffic Analysis ◦  Funnel ◦  SEO Analysis ◦  …

Dataiku - Pig, Hive and Cascading

Typical Use Case 1 Web Analytics Processing

Page 7: Dataiku   pig - hive - cascading

}  Extract Query Logs }  Perform query

normalization }  Compute Ngrams }  Compute Search

“Sessions” }  Compute Log-

Likehood Ratio for ngrams across sesions

Dataiku - Pig, Hive and Cascading

Typical Use Case 2 Mining Search Logs for Synonyms

Page 8: Dataiku   pig - hive - cascading

}  Compute User – Product Association Matrix

}  Compute different similarities ratio (Ochiai, Cosine, …)

}  Filter out bad predictions

}  For each user, select best recommendable products

Dataiku - Pig, Hive and Cascading

Typical Use Case 3 Product Recommender

Page 9: Dataiku   pig - hive - cascading

}  Hadoop and Context }  Pig, Hive, Cascading, … }  How they work }  Comparing the tools

Agenda

Dataiku - Pig, Hive and Cascading

Page 10: Dataiku   pig - hive - cascading

}  Yahoo Research in 2006 }  Inspired from Sawzall, a Google Paper

from 2003 }  2007 as an Apache Project

}  Initial motivation ◦  Search Log Analytics: how long is the

average user session ? how many links does a user click ? on before leaving a website ? how do click patterns vary in the course of a day/week/month ? …

Pig History

Dataiku - Pig, Hive and Cascading

words = LOAD '/training/hadoop-wordcount/output‘ USING PigStorage(‘\t’)

AS (word:chararray, count:int); sorted_words = ORDER words BY count DESC; first_words = LIMIT sorted_words 10; DUMP first_words;

Page 11: Dataiku   pig - hive - cascading

}  Developed by Facebook in January 2007 }  Open source in August 2008 }  Initial Motivation ◦  Provide a SQL like abstraction to perform

statistics on status updates

Hive History

Dataiku - Pig, Hive and Cascading

create external table wordcounts ( word string, count int ) row format delimited fields terminated by '\t' location '/training/hadoop-wordcount/output'; select * from wordcounts order by count desc limit 10; select SUM(count) from wordcounts where word like ‘th%’;

Page 12: Dataiku   pig - hive - cascading

}  Authored by Chris Wensel 2008 }  Associated Projects ◦  Cascalog : Cascading in Closure ◦  Scalding : Cascading in Scala (Twitter

in 2012) ◦  Lingual ( to be released soon): SQL

layer on top of cascading

Cascading History

Dataiku - Pig, Hive and Cascading

Page 13: Dataiku   pig - hive - cascading

}  Hadoop and Context }  Pig, Hive, Cascading, … }  How they work }  Comparing the tools

Agenda

Dataiku - Pig, Hive and Cascading

Page 14: Dataiku   pig - hive - cascading

MapReduce Simplicity is a complexity

4/14/13 Dataiku - Innovation Services 14

Page 15: Dataiku   pig - hive - cascading

Pig & HiveMapping to Mapreduce jobs

4/14/13 Dataiku - Innovation Services 15

* VAT excluded

events = LOAD ‘/events’ USING PigStorage(‘\t’) AS (type:chararray, user:chararray, price:int, timestamp:int);

events_filtered = FILTER events BY type;

by_user = GROUP events_filtered BY user;

price_by_user = FOREACH by_user GENERATE type, SUM(price) AS total_price, MAX(timestamp) as max_ts;

high_pbu = FILTER price_by_user BY total_price > 1000;

Job 1 : Mapper Job 1 : Reducer1 LOAD FILTER GROUP FOREACH FILTER Shuffle and

sort by user

Page 16: Dataiku   pig - hive - cascading

Pig & HiveMapping to Mapreduce jobs

4/14/13 Dataiku - Innovation Services 16

events = LOAD ‘/events’ USING PigStorage(‘\t’) AS (type:chararray, user:chararray, price:int, timestamp:int);

events_filtered = FILTER events BY type;

by_user = GROUP events_filtered BY user;

price_by_user = FOREACH by_user GENERATE type, SUM(price) AS total_price, MAX(timestamp) as max_ts;

high_pbu = FILTER price_by_user BY total_price > 1000;

recent_high = ORDER high_pbu BY max_ts DESC;

STORE recent_high INTO ‘/output’;

Job 1: Mapper Job 1 :Reducer LOAD FILTER GROUP FOREACH FILTER Shuffle and

sort by user

Job 2: Mapper Job 2: Reducer LOAD

(from tmp) STORE Shuffle and sort by max_ts

Page 17: Dataiku   pig - hive - cascading

Pig How does it work

Dataiku - Pig, Hive and Cascading

Data Execution Plan compiled into 10 map reduce jobs executed in parallel (or not)

84 TResolution = LOAD '$PREFIX/dwh_dim_external_tracking_resolution/dt=$DAY' USING PigStorage('\u0001'); 85 TResolution = FOREACH TResolution GENERATE $0 AS SKResolutionId, $1 as ResolutionId; 86 87 88 TSiteMap = LOAD '$PREFIX/dwh_dim_sitemapnode/dt=$DAY' USING PigStorage('\u0001'); 89 TSiteMap = FOREACH TSiteMap GENERATE $0 AS SKSimteMapNodeId, $2 as SiteMapNodeId; 90 91 92 TCustomer = LOAD '$PREFIX/customer_relation/dt=$DAY' USING PigStorage('\u0001') 93 as (SKCustomerId:chararray, 94 CustomerId:chararray); 95 96 F1 = FOREACH F1 GENERATE *, (date_time IS NOT NULL ? CustomFormatToISO(date_time, 'yyyy-MM-dd HH:mm:ss' 97 98 F2 = FOREACH F1 GENERATE *, 99 CONCAT(CONCAT(CONCAT(CONCAT(visid_high,'-'), visid_low), '-'), visit_num) as VisitId, 100 (referrer matches '.*cdiscount.com.*' OR referrer matches 'cdscdn.com' ? NULL :referrer ) as Referrer, 101 (iso IS NOT NULL ? ISODaysBetween(iso, '1899-12-31T00:00:00') : NULL) 102 AS SkDateId, 103 (iso IS NOT NULL ? ISOSecondsBetween(iso, ISOToDay(iso)) : NULL) 104 AS SkTimeId, 105 ((event_list is not null and event_list matches '.*\\b202\\b.*') ? 'Y' : 'N') as is_202,

106 ((event_list is not null and event_list matches '.*\\b10\\b.*') ? 'Y' : 'N') as is_10, 107 ((event_list is not null and event_list matches '.*\\b12\\b.*') ? 'Y' : 'N') as is_12, 108 ((event_list is not null and event_list matches '.*\\b13\\b.*') ? 'Y' : 'N') as is_13, 109 ((event_list is not null and event_list matches '.*\\b14\\b.*') ? 'Y' : 'N') as is_14, 110 ((event_list is not null and event_list matches '.*\\b11\\b.*') ? 'Y' : 'N') as is_11, 111 ((event_list is not null and event_list matches '.*\\b1\\b.*') ? 'Y' : 'N') as is_1, 112 REGEX_EXTRACT(pagename, 'F-(.*)\\:.*', 1) AS ProductReferenceId, 113 NULL AS OriginFile; 114 115 SET DEFAULT_PARALLEL 24; 116 117 F3 = JOIN F2 BY post_search_engine LEFT, TSearchEngine BY SearchEngineId USING 'replicated' PARALLEL 20 ; 118 F3 = FOREACH F3 GENERATE *, (SKSearchEngineId IS NULL ? '-1' : SKSearchEngineId) as SKSearchEngineId; 119 --F3 = FOREACH F2 GENERATE *, NULL AS SKSearchEngineId, NULL AS SearchEngineId; 120 121 F4 = JOIN F3 BY browser LEFT, TBrowser BY BrowserId USING 'replicated' PARALLEL 20; 122 F4 = FOREACH F4 GENERATE *, (SKBrowserId IS NULL ? '-1' : SKBrowserId) as SKBrowserId; 123 124 --F4 = FOREACH F3 GENERATE *, NULL AS SKBrowserId, NULL AS BrowserId; 125 126 127 F5 = JOIN F4 BY os LEFT, TOperatingSystem BY OperatingSystemId USING 'replicated' PARALLEL 20; 128 F5 = FOREACH F5 GENERATE *, (SKOperatingSystemId IS NULL ? '-1' : SKOperatingSystemId) as SKOperatingSystemId; 129 130 --F5 = FOREACH F4 GENERATE *, NULL AS SKOperatingSystemId, NULL AS OperatingSystemId; 131 132

133 F6 = JOIN F5 BY resolution LEFT, TResolution BY ResolutionId USING 'replicated' PARALLEL 20; 134 F6 = FOREACH F6 GENERATE *, (SKResolutionId IS NULL ? '-1' : SKResolutionId) as SKResolutionId; 135 136 --F6 = FOREACH F5 GENERATE *, NULL AS SKResolutionId, NULL AS ResolutionId; 137 138 F7 = JOIN F6 BY post_evar4 LEFT, TSiteMap BY SiteMapNodeId USING 'replicated' PARALLEL 20; 139 F7 = FOREACH F7 GENERATE *, (SKSimteMapNodeId IS NULL ? '-1' : SKSimteMapNodeId) as SKSimteMapNodeId; 140 141 --F7 = FOREACH F6 GENERATE *, NULL AS SKSimteMapNodeId, NULL AS SiteMapNodeId; 142 143 144 SPLIT F7 INTO WITHOUT_CUSTOMER IF post_evar30 IS NULL, WITH_CUSTOMER IF post_evar30 IS NOT NULL; 145 146 F8 = JOIN WITH_CUSTOMER BY post_evar30 LEFT, TCustomer BY CustomerId USING 'skewed' PARALLEL 20; 147 WITHOUT_CUSTOMER = FOREACH WITHOUT_CUSTOMER GENERATE *, NULL as SKCustomerId, NULL as CustomerId; 148 149 --F8_UNION = FOREACH F7 GENERATE *, NULL as SKCustomerId, NULL as CustomerId; 150 F8_UNION = UNION F8, WITHOUT_CUSTOMER; 151 --DESCRIBE F8; 152 --DESCRIBE WITHOUT_CUSTOMER; 153 --DESCRIBE F8_UNION; 154 155 F9 = FOREACH F8_UNION GENERATE 156 visid_high, 157 visid_low, 158 VisitId,

159 post_evar30, 160 SKCustomerId, 161 visit_num, 162 SkDateId, 163 SkTimeId, 164 post_evar16, 165 post_evar52, 166 visit_page_num, 167 is_202, 168 is_10, 169 is_12,

Page 18: Dataiku   pig - hive - cascading

CascadingFrom Code To Jobs

Dataiku - Pig, Hive and Cascading

Page 19: Dataiku   pig - hive - cascading

Reducer 2 Mappers output

Reducer 1

Hive JoinsHow to join with MapReduce ?

4/14/13 Dataiku - Innovation Services 19

tbl_idx uid name

1 1 Dupont

1 2 Durand

tbl_idx uid type

2 1 Type1

2 1 Type2

2 2 Type1

Shuffle by uid Sort by (uid, tbl_idx)

Uid Tbl_idx Name Type

1 1 Dupont

1 2 Type1

1 2 Type2

Uid Tbl_idx Name Type

2 1 Durand

2 2 Type1

Uid Name Type

1 Dupont Type1

1 Dupont Type2

Uid Name Type

2 Durand Type1

Page 20: Dataiku   pig - hive - cascading

}  Hadoop and Context }  Pig, Hive, Cascading, … }  How they work }  Comparing the tools

Agenda

Dataiku - Pig, Hive and Cascading

Page 21: Dataiku   pig - hive - cascading

}  Philosophy ◦  Procedural Vs Declarative ◦  Data Model and Schema

}  Productivity ◦  Headachability ◦  Checkpointing ◦  Testing and environment

}  Integration ◦  Partitioning ◦  Formats Integration ◦  External Code Integration

}  Performance and optimization

Comparing without Comparable  

Dataiku - Pig, Hive and Cascading

Page 22: Dataiku   pig - hive - cascading

}  Transformation as a sequence of operations

}  Transformation as a set of formulas

Dataiku - Pig, Hive and Cascading

Procedural Vs Declarative

insert  into  ValuableClicksPerDMA  select  dma,  count(*)  from  geoinfo  join  (    

 select  name,  ipaddr  from  users  join  clicks  on  (users.name  =  clicks.user)  

 where  value  >  0;    )  using  ipaddr  

group  by  dma;  

Users                                =  load  'users'  as  (name,  age,  ipaddr);  Clicks                              =  load  'clicks'  as  (user,  url,  value);  ValuableClicks              =  filter  Clicks  by  value  >  0;  UserClicks                      =  join  Users  by  name,  ValuableClicks  by  user;  Geoinfo                            =  load  'geoinfo'  as  (ipaddr,  dma);  UserGeo                            =  join  UserClicks  by  ipaddr,  Geoinfo  by  ipaddr;  ByDMA                                =  group  UserGeo  by  dma;  ValuableClicksPerDMA  =  foreach  ByDMA  generate  group,  COUNT(UserGeo);  store  ValuableClicksPerDMA  into  'ValuableClicksPerDMA';  

Page 23: Dataiku   pig - hive - cascading

}  All three Extend basic data model with extended data types ◦  array-like [ event1, event2, event3] ◦  map-like { type1:value1, type2:value2, …}

}  Different approach ◦  Resilient Schema ◦  Static Typing ◦  No Static Typing

Data type and Model Rationale

Dataiku - Pig, Hive and Cascading

Page 24: Dataiku   pig - hive - cascading

HiveData Type and Schema

4/14/13 24

Simple type Details TINYINT, SMALLINT, INT, BIGINT 1, 2, 4 and 8 bytes FLOAT, DOUBLE 4 and 8 bytes BOOLEAN

STRING Arbitrary-length, replaces VARCHAR TIMESTAMP

Complex type Details ARRAY Array of typed items (0-indexed) MAP Associative map STRUCT Complex class-like objects

Dataiku Training – Hadoop for Data Science

CREATE TABLE visit ( user_name STRING, user_id INT, user_details STRUCT<age:INT, zipcode:INT>

);

Page 25: Dataiku   pig - hive - cascading

rel = LOAD '/folder/path/' USING PigStorage(‘\t’) AS (col:type, col:type, col:type);

Data types and Schema Pig

4/14/13 25

Simple type Details int, long, float, double

32 and 64 bits, signed

chararray A string

bytearray An array of … bytes boolean A boolean

Complex type Details tuple a tuple is an ordered fieldname:value map bag a bag is a set of tuples

Dataiku Training – Hadoop for Data Science

Page 26: Dataiku   pig - hive - cascading

}  Support for Any Java Types, provided they can be serialized in Hadoop

}  No support for Typing

Data Type and Schema Cascading

Dataiku - Pig, Hive and Cascading

Simple type Details Int, Long, Float, Double

32 and 64 bits, signed

String A string

byte[] An array of … bytes Boolean A boolean

Complex type Details Object Object must be « Hadoop serializable »

Page 27: Dataiku   pig - hive - cascading

Style Summary

Dataiku - Pig, Hive and Cascading

Style Typing Data Model Metadata store

Pig Procedural Static + Dynamic

scalar + tuple+ bag

(fully recursive)

No (HCatalog)

Hive Declarative Static + Dynamic,

enforced at execution

time

scalar+ list + map

Integrated

Cascading Procedural Weak scalar+ java objects

No

Page 28: Dataiku   pig - hive - cascading

}  Philosophy ◦  Procedural Vs Declarative ◦  Data Model and Schema

}  Productivity ◦  Headachability ◦  Checkpointing ◦  Testing, error management and environment

}  Integration ◦  Partitioning ◦  Formats Integration ◦  External Code Integration

}  Performance and optimization

Comparing without Comparable  

Dataiku - Pig, Hive and Cascading

Page 29: Dataiku   pig - hive - cascading

}  Does debugging the tool lead to bad headaches ?

Dataiku - Pig, Hive and Cascading

Headachility Motivation

Page 30: Dataiku   pig - hive - cascading

}  Out Of Memory Error (Reducer)

}  Exception in Building / Extended Functions (handling of null)

}  Null vs “”

}  Nested Foreach and scoping

}  Date Management (pig 0.10)

}  Field implicit ordering

Dataiku - Pig, Hive and Cascading

HeadachesPig

Page 31: Dataiku   pig - hive - cascading

A Pig Error

Dataiku - Pig, Hive and Cascading

Page 32: Dataiku   pig - hive - cascading

}  Out of Memory Errors in Reducers

}  Few Debugging Options

}  Null / “”

}  No builtin “first”

Dataiku - Pig, Hive and Cascading

HeadachesHive

Page 33: Dataiku   pig - hive - cascading

}  Weak Typing Errors (comparing Int and String … )

}  Illegal Operation Sequence (Group after group …)

}  Field Implicit Ordering

Dataiku - Pig, Hive and Cascading

HeadachesCascading

Page 34: Dataiku   pig - hive - cascading

}  How to perform unit tests ? }  How to have different versions of the same script

(parameter) ?

TestingMotivation

Dataiku - Pig, Hive and Cascading

Page 35: Dataiku   pig - hive - cascading

}  System Variables }  Comment to test }  No Meta Programming }  pig –x local to execute on local files

TestingPig

Dataiku - Pig, Hive and Cascading

Page 36: Dataiku   pig - hive - cascading

}  Junit Tests are possible }  Ability to use code to actually comment out some

variables

Testing / Environment Cascading

Dataiku - Pig, Hive and Cascading

Page 37: Dataiku   pig - hive - cascading

}  Lots of iteration while developing on Hadoop }  Sometime jobs fail }  Sometimes need to restart from the start …

Checkpointing Motivation

Dataiku - Pig, Hive and Cascading

Page User Correlation Output Filtering Parse Logs Per Page Stats

FIX and relaunch

Page 38: Dataiku   pig - hive - cascading

}  STORE Command to manually store files

PigManual Checkpointing

Dataiku - Pig, Hive and Cascading

Page User Correlation Output Filtering Parse Logs Per Page Stats

// COMMENT Beginning of script and relaunch

Page 39: Dataiku   pig - hive - cascading

}  Ability to re-run a flow automatically from the last saved checkpoint

Dataiku - Pig, Hive and Cascading

Cascading Automated Checkpointing

addCheckpoint(…)  

Page 40: Dataiku   pig - hive - cascading

}  Check each file intermediate timestamp }  Execute only if more recent

Dataiku - Pig, Hive and Cascading

Cascading Topological Scheduler

Page User Correlation Output Filtering Parse Logs Per Page Stats

Page 41: Dataiku   pig - hive - cascading

Productivity Summary

Dataiku - Pig, Hive and Cascading

Headaches Checkpointing/Replay

Testing / Metaprogrammation

Pig Lots Manual Save Difficult

Hive Few, but without

debugging options

None (That’s SQL) None (That’s SQL)

Cascading Weak Typing Complexity

Checkpointing Partial Updates

Possible

Page 42: Dataiku   pig - hive - cascading

}  Philosophy ◦  Procedural Vs Declarative ◦  Data Model and Schema

}  Productivity ◦  Headachability ◦  Checkpointing ◦  Testing and environment

}  Integration ◦  Formats Integration ◦  Partitioning ◦  External Code Integration

}  Performance and optimization

Comparing without Comparable  

Dataiku - Pig, Hive and Cascading

Page 43: Dataiku   pig - hive - cascading

}  Ability to integrate different file formats ◦  Text Delimited ◦  Sequence File (Binary Hadoop format) ◦  Avro, Thrift ..

}  Ability to integrate with external data sources or sink ( MongoDB, ElasticSearch, Database. …)

Formats IntegrationMotivation

Dataiku - Pig, Hive and Cascading

Format Size on Disk (GB) HIVE Processing time (24 cores)

Text File, uncompressed 18.7 1m32s

1 Text File, Gzipped 3.89 6m23s (no parallelization)

JSON compressed 7.89 2m42s

multiple text file gzipped 4.02 43s

Sequence File, Block, Gzip 5.32 1m18s

Text File, LZO Indexed 7.03 1m22s

Format impact on size and performance

Page 44: Dataiku   pig - hive - cascading

}  Hive: Serde (Serialize-Deserializer) }  Pig : Storage }  Cascading: Tap

Format Integration

Dataiku - Pig, Hive and Cascading

Page 45: Dataiku   pig - hive - cascading

}  No support for “UPDATE” patterns, any increment is performed by adding or deleting a partition

}  Common partition schemas on Hadoop ◦  By Date /apache_logs/dt=2013-01-23 ◦  By Data center /apache_logs/dc=redbus01/… ◦  By Country ◦  … ◦  Or any combination of the above

PartitionsMotivation

Dataiku - Pig, Hive and Cascading

Page 46: Dataiku   pig - hive - cascading

Hive PartitioningPartitioned tables

4/14/13 46

CREATE TABLE event ( user_id INT, type STRING, message STRING)

PARTITIONED BY (day STRING, server_id STRING);

Disk structure /hive/event/day=2013-01-27/server_id=s1/file0 /hive/event/day=2013-01-27/server_id=s1/file1 /hive/event/day=2013-01-27/server_id=s2/file0 /hive/event/day=2013-01-27/server_id=s2/file1 … /hive/event/day=2013-01-28/server_id=s2/file0 /hive/event/day=2013-01-28/server_id=s2/file1

Dataiku Training – Hadoop for Data Science

INSERT  OVERWRITE  TABLE    event  PARTITION(ds='2013-­‐01-­‐27',  server_id=‘s1’)  SELECT  *  FROM  event_tmp;  

Page 47: Dataiku   pig - hive - cascading

}  No Direct support for partition }  Support for “Glob” Tap, to build read from files

using patterns

}  è You can code your own custom or virtual

partition schemes

Cascading Partition

Dataiku - Pig, Hive and Cascading

Page 48: Dataiku   pig - hive - cascading

External Code IntegrationSimple UDF

Dataiku - Pig, Hive and Cascading

Pig Hive

Cascading

Page 49: Dataiku   pig - hive - cascading

Hive Complex UDF (Aggregators)

Dataiku - Pig, Hive and Cascading

Page 50: Dataiku   pig - hive - cascading

Cascading Direct Code Evaluation

Dataiku - Pig, Hive and Cascading

Page 51: Dataiku   pig - hive - cascading

IntegrationSummary

Dataiku - Pig, Hive and Cascading

Partition/Incremental Updates

External Code Format Integration

Pig No Direct Support

Simple Doable and rich community

Hive Fully integrated, SQL Like

Very simple, but complex dev setup

Doable and existing

community Cascading With Coding Complex UDFS

but regular, and Java Expression

embeddable

Doable and growing

commuinty

Page 52: Dataiku   pig - hive - cascading

}  Philosophy ◦  Procedural Vs Declarative ◦  Data Model and Schema

}  Productivity ◦  Headachability ◦  Checkpointing ◦  Testing and environment

}  Integration ◦  Formats Integration ◦  Partitioning ◦  External Code Integration

}  Performance and optimization

Comparing without Comparable  

Dataiku - Pig, Hive and Cascading

Page 53: Dataiku   pig - hive - cascading

}  Several Common Map Reduce Optimization Patterns ◦  Combiners ◦  MapJoin ◦  Job Fusion ◦  Job Parallelism ◦  Reducer Parallelism

}  Different support per framework ◦  Fully Automatic ◦  Pragma / Directives / Options ◦  Coding style / Code to write

Optimization

Dataiku - Pig, Hive and Cascading

Page 54: Dataiku   pig - hive - cascading

SELECT  date,  COUNT(*)  FROM  product  GROUP  BY  date  

CombinerPerform Partial Aggregate at Mapper Stage

Dataiku - Pig, Hive and Cascading

Map Reduce 2012-­‐02-­‐14  4354  

…  

2012-­‐02-­‐15  21we2  

 

2012-­‐02-­‐14  qa334  

…  

2012-­‐02-­‐15  23aq2  

 

2012-­‐02-­‐14  20  

2012-­‐02-­‐15  35  

2012-­‐02-­‐16  1  

2012-­‐02-­‐14  4354  

…  

2012-­‐02-­‐15  21we2  

 

2012-­‐02-­‐14  qa334  

…  

2012-­‐02-­‐15  23aq2  

 

Page 55: Dataiku   pig - hive - cascading

SELECT  date,  COUNT(*)  FROM  product  GROUP  BY  date  

CombinerPerform Partial Aggregate at Mapper Stage

Dataiku - Pig, Hive and Cascading

Map Reduce 2012-­‐02-­‐14  4354  

…  

2012-­‐02-­‐15  21we2  

 

2012-­‐02-­‐14  qa334  

…  

2012-­‐02-­‐15  23aq2  

  2012-­‐02-­‐14  12  2012-­‐02-­‐15  23  2012-­‐02-­‐16  1      

 

2012-­‐02-­‐14  8  2012-­‐02-­‐15  12      

2012-­‐02-­‐14  20  

2012-­‐02-­‐15  35  

2012-­‐02-­‐16  1  

Reduced network bandwith. Better parallelism

Page 56: Dataiku   pig - hive - cascading

Join OptimizationMap Join

Dataiku - Pig, Hive and Cascading

set hive.auto.convert.join = true; Hive

Pig

Cascading

( no aggregation support after HashJoin)

Page 57: Dataiku   pig - hive - cascading

}  Critical for performance

}  Estimated per the size of input file ◦  Hive

�  divide size per hive.exec.reducers.bytes.per.reducer (default 1GB) ◦  Pig

�  divide size pig.exec.reducers.bytes.per.reducer (default 1GB)

Number of Reducers

Dataiku - Pig, Hive and Cascading

Page 58: Dataiku   pig - hive - cascading

Combiner Optimization

Join Optimization

Number of reducers optimization

Pig Automatic Option Estimate or DIY Cascading DIY HashJoin DIY Hive Partial

DIY Automatic (Map Join)

Estimate or DIY

Performance & Optimization Summary

Dataiku - Pig, Hive and Cascading

Page 59: Dataiku   pig - hive - cascading

}  Hadoop and Context (->0:03) }  Pig, Hive, Cascading, … (->0:06) }  How they work (->0:09) }  Comparing the tools (->0:25) }  Wrap’up and question (->0:30)

Agenda

Dataiku - Pig, Hive and Cascading

Page 60: Dataiku   pig - hive - cascading

}  Want to keep close to SQL ? ◦  Hive

}  Want to write large flows ? ◦  Pig

}  Want to integrate in large scale programming projects ◦  Cascading (cascalog / scalding)

Dataiku - Pig, Hive and Cascading