hadoop pig

54
The analytics stack Hadoop & Pig

Upload: sean-murphy

Post on 10-May-2015

4.632 views

Category:

Technology


1 download

DESCRIPTION

Lamine's presentation...

TRANSCRIPT

Page 1: Hadoop pig

The analytics stack Hadoop & Pig

Page 2: Hadoop pig

Hadoop Motivations. What is it? And high-level concepts

The Ecosystem. The MapReduce model & framework and HDFS

Programming with Hadoop

Pig What is it? Motivations

Model & components

Integration with Cassandra

Outline of the presentation

2

Page 3: Hadoop pig

3

Please interrupt and ask questions!

Page 4: Hadoop pig

CPU-intensive computations

Relatively small amount of data

Tightly-coupled applications

Highly concurrent I/O requirements

Complex message passing paradigms such as MPI, PVM…

Developers might need to spend some time designing for failure

4

Traditional HPC systems

Page 5: Hadoop pig

Challenges

Data and storage

Locality, computation close to the data

In large-scale systems, nodes fail

Mean time between failures: 1 node / 3 years, 1000 nodes / 1 day

Built-in fault-tolerance

Distributed programming is complex

Need a simple data-parallel programming model. Users would structure the application in high-level functions, the system distributes the data & jobs and handles communications and faults

5

Page 6: Hadoop pig

What requirements

A simple data-parallel programming model, designed for high scalability and resiliency

Scalability to large-scale data volumes

Automated fault-tolerance at application level rather than relying on high-availability hardware

Simplified I/O and tasks monitoring

All based on cost-efficient commodity machines (cheap, but unreliable), and commodity network

6

Page 7: Hadoop pig

Data spread in advance, persistent (in terms of locality), and replicated

No inter-dependencies / shared nothing architecture

Applications written in two pieces of code

And developers do not have to worry about the underlying issues in networking, jobs interdependencies, scheduling, etc…

7

Hadoop’s core concepts

Page 8: Hadoop pig

Hadoop originated from Apache Nutch, an open source web search engine

After the publications of the GFS and MapReduce papers, in 2003 & 2004, the Nutch developers decided to implement open source versions

In February 2006, it became Hadoop, with a dedicated team at Yahoo!

September 2007 - release 0.14.1

Last release 1.0.3 out last week

Used by a large number of companies including Facebook, LinkedIn, Twitter, hulu, among many others..

8

Where does it come from?

Page 9: Hadoop pig

The model

A map function processes a key/value pair to generate a set of intermediate key/value pairs

Divides the problem into smaller ‘intermediate key/value’ pairs

The reduce function merge all intermediate values associated with the same intermediate key

Run-time system takes care of:

Partitioning the input data across nodes (blocks/chunks typically of 64Mb to 128Mb)

Scheduling the data and execution. Maps operate on a single block.

Manages node failures, replication, re-submissions..

9

Page 10: Hadoop pig

Simple Word Count

♯key: offset, value: line

def mapper():

for line in open(“doc”):

for word in line.split():

output(word, 1)

♯key: a word, value: iterator over counts

def reducer():

output(key, sum(value))

10

Page 11: Hadoop pig

The Combiner

def combiner():

output(key, sum(values))

A combiner is a local aggregation function for repeated keys produced by the map

Works for associative functions like sum, count, max

Decreases the size of intermediate data / communications

map-side aggregation for word count:

11

Page 12: Hadoop pig

Some other basic examples…

Distributed Grep: Map function emits a line if it matches a supplied pattern Reduce function is an identity function that copies the supplied intermediate

data to the output

Count of URL accesses: Map function processes logs of web page requests and outputs <URL, 1> Reduce function adds together all values for the same URL, emitting <URL, total

count> pairs

Reverse Web-Link graph: Map function outputs <tgt, src> for each link to a tgt in a page named src Reduce concatenates the list of all src URLS associated with a given tgt URL and

emits the pair: <tgt, list(src)>

Inverted Index: Map function parses each document, emitting a sequence of <word, doc_ID> Reduce accepts all pairs for a given word and emits a <word, list(doc_ID)> pair

12

Page 13: Hadoop pig

from http://indoos.wordpress.com/2010/08/16/hadoop-ecosystem-world-map/

Hadoop Ecosystem

13 Core components

Page 14: Hadoop pig

Hadoop consists of two core components

The MapReduce framework, and

The Hadoop Distributed File System

14

Hadoop components

MapReduce layer JobTracker

TaskTrackers

HDFS layer Namenode

Secondary namenode

Datanode Example of a typical physical distribution within a Hadoop cluster

Page 15: Hadoop pig

HDFS

Scalable and fault-tolerant. Based on Google’s GFS

Single namenode stores metadata (file names, block locations, etc.).

Files split into chunks, replicated across several datanodes (typically 3+). It is rack-aware

Optimised for large files, sequential streaming reads, rather than random

Files written once, no append

Namenode

Datanodes

1 2 3 4

1 2 4

2 1 3

1 4 3

3 2 4

File1

15

Page 16: Hadoop pig

HDFS API / HDFS FS Shell for command line* > hadoop fs –copyFromLocal local_dir hdfs_dir

> hadoop fs –copToLocal hdfs_dir local_dir

Tools

Flume: collects, aggregates and move log data from application servers to HDFS

Sqoop: HDFS import and export to SQL

16

HDFS

*http://hadoop.apache.org/common/docs/r0.20.0/hdfs_shell.html

Page 17: Hadoop pig

In Hadoop, a Job (full program) is a set of tasks

Each task (mapper or reducer) is attempted at least once, or multiple times if it crashes. Multiple attempts may also occur in parallel

The tasks run inside a separate JVM on the tasktracker

All the class files are assembled into a jar file, which will be uploaded into HDFS, before notifying the tasktracker

17

MapReduce execution

Page 18: Hadoop pig

18

MapReduce execution

MapReduce Job

Master

Split 0

Split 1

Split 2

Split 3

Split 4

Worker

Worker

Worker

Worker

Worker

Local write read

Output files

Intermediate files locally

Remote read

Input files

Page 19: Hadoop pig

Getting Started…

Multiple choices - Vanilla Apache version, or one of the numerous existing distros hadoop.apache.org

www.cloudera.com [A set of VMs is also provided]

http://www.karmasphere.com/

Three ways to write jobs in Hadoop: Java API

Hadoop Streaming (for Python, Perl, etc.)

Pipes API (C++)

19

Page 20: Hadoop pig

Word Count in Java

20

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WordCount.class);

conf.setJobName("wordcount");

conf.setMapperClass(MapClass.class);

conf.setCombinerClass(ReduceClass.class);

conf.setReducerClass(ReduceClass.class);

FileInputFormat.setInputPaths(conf, args[0]);

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}

Page 21: Hadoop pig

public class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable ONE = new IntWritable(1); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { out.collect(new text(itr.nextToken()), ONE); } } }

Word Count in Java – mapper

21

Page 22: Hadoop pig

Word Count in Java – reducer

public class ReduceClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } out.collect(key, new IntWritable(sum)); } }

22

Page 23: Hadoop pig

23

Getting keys and values

Input file

Input split Input split

RecordReader RecordReader Inp

ut

Form

at

Mapper Mapper

Reducer Reducer

RecordWriter RecordWriter

Output file Output file

Ou

tpu

t Fo

rmat

Page 24: Hadoop pig

Hadoop Streaming

#!/usr/bin/env python

import sys for line in sys.stdin: for word in line.split(): print "%s\t%s" % (word, 1)

#!/usr/bin/env python

import sys

dict={}

for line in sys.stdin:

word, count = line.split("\t", 1)

dict[word] = dict.get(word, 0) + int(count)

counts = dict.items()

for word, count in counts:

print "%s\t%s" % (word.lower(), count)

Mapper.py:

Reducer.py:

24

You can locally test your code on the command line: $> cat data | mapper | sort | reducer

Page 25: Hadoop pig

High-level tools

MapReduce is fairly low-level: must think about keys, values, partitioning, etc.

How to express parallel algorithms by a series of MapReduce jobs

Can be hard to capture common job building blocks

Different use cases require different tools!

25

Page 26: Hadoop pig

Apache Pig is a platform raising a level of abstraction for processing large datasets. Its language, Pig Latin is a simple query algebra expressing data transformations and applying functions to records

Started at Yahoo! Research, >60% of Hadoop jobs within Yahoo! are Pig jobs

26

Pig

Pig job submission

MapReduce jobs Hadoop / HDFS

Page 27: Hadoop pig

MapReduce requires a Java programmer

Solution was to abstract it and create a system where users are familiar with scripting languages

Other than very trivial applications, MapReduce requires multiple stages, leading to long development cycles

Rapid prototyping. Increased productivity

In MapReduce users have to reinvent common functionality (join, filter, etc.)

Pig provides them

27

Motivations

Page 28: Hadoop pig

Rapid prototyping of algorithms for processing large datasets

Log analysis

Ad hoc queries across various large datasets

Analytics (including through sampling)

Pig Mix provides a set of performance and scalability benchmarks. Currently 1.1 times MapReduce speed.

28

Used for

Page 29: Hadoop pig

Grunt, the Pig shell

Executing scripts directly

Embedding Pig in Java (using PigServer, similar to SQL using JDBC), or Python

A range of tools including Eclipse plug-ins

PigPen, Pig Editor…

29

Using Pig

Page 30: Hadoop pig

Pig has two execution types or modes: local mode and Hadoop mode

Local

Pig runs in a single JVM and accesses the local filesystem. Starting form v0.7 it uses the Hadoop job runner.

Hadoop mode

Pig runs on a Hadoop cluster (you need to tell Pig about the version and point it to your Namenode and Jobtracker

30

Execution modes

Page 31: Hadoop pig

Pig resides on the user’s machine and can be independent from the Hadoop cluster

Pig is written in Java and is portable

Compiles into map reduce jobs and submit them to the cluster

No need to install anything extra on the cluster

31

Running Pig

Pig client

Page 32: Hadoop pig

Pig defines a DAG. A step-by-step set of operations, each performing a transformation

Pig defines a logical plan for these transformations:

32

How does it work

A = LOAD ’file' as (line); B = FOREACH A GENERATE FLATTEN(TOKENIZE(line)) AS word; C = GROUP B BY word; D = FOREACH C GENERATE group, COUNT(words); STORE D INTO ‘output’

• Parses, checks, & optimises • Plan the execution

• Maps & Reduces • Passes the jar to Hadoop • Monitor the progress

Page 33: Hadoop pig

Scalar type:

Int, Long, Float, Double, Chararray, Bytearray

Complex type representing nested structures:

Tuple: sequence of fields of any type

Bag: an unordered collection of tuples

Map: a set of key-value pairs. Keys must be atoms, values may be any type

Expressions:

used in Pig as a part of a statement; field name, position ($), arithmetic, conditional, comparison, Boolean, etc.

33

Data types & expressions

Page 34: Hadoop pig

Load / Store

Data loaders; PigStorage, BinStorage, BinaryStorage, TextLoader, PigDump

Evaluation

Many built-in functions MAX, COUNT, SUM, DIFF, SIZE…

Filter

A special type of eval function used by the FILTER operator. IsEmpty is a built-in function

Comparison

Function used in ORDER statement; ASC | DESC

34

Functions

Page 35: Hadoop pig

Schemas enable you to associate names and types of the fields in the relation

Schemas are optional but recommended whenever possible; type declarations result in better parse-time error checking and more efficient code execution

They are defined using the AS keyword with operators

Schema definition for simple data types:

> records = LOAD 'input/data' AS (id:int, date:chararray);

35

Schemas

Page 36: Hadoop pig

Each statement, defining a data processing operator / relation, produces a dataset with an alias

grunt> records = LOAD 'input/data' AS (id:int, date:chararray);

LOAD returns a tuple, which elements can be referenced by position or by name

Very useful operators are DUMP, ILLUSTRATE, and DESCRIBE

36

Statements and aliases

Page 37: Hadoop pig

Filter is user to work with tuples and rows of data

Select data you want, or remove the data you are not interested in

Filtering early in the processing pipeline minimises the amount of data flowing through the system, which can improve efficiency

grunt> filtered_records = FILTER records BY id == 234;

37

Filtering data

Page 38: Hadoop pig

Foreach .. Generate acts on columns on every row in a relation

grunt> ids = FOREACH records GENERATE id;

Positional reference. This statement has the same output grunt> ids = FOREACH records GENERATE $0;

The elements of ‘ids’ however are not named ‘id’ unless you add ‘AS id’ at the end of your statement

grunt> ids = FOREACH records GENERATE $0 AS id;

38

Foreach .. Generate

Page 39: Hadoop pig

Group .. by makes an output bag containing grouped fields with the same schema using a grouping key

Join performs inner, equijoin of two or more relations based on common field values.

You can also perform outer joins using keywords left, right and full

Cogroup is similar to Group, using multiple relations, and creates a nested set of output tuples

39

Grouping and joining

Page 40: Hadoop pig

Order imposes an order on the output to sort a relation by one or more fields

The Limit statement limits the number of results

Split partitions a relation into two or more relations

the Sample operator selects a random data sample with the stated sample size

the Union operator to merge the contents of two or more relations

40

Ordering, combining, splitting…

Page 41: Hadoop pig

The Stream operator allows to transform data in a relation using an external program or script

grunt> C = STREAM A THROUGH `cut -f 2`;

Extract the second field of A using cut

The scripts are shipped to the cluster using grunt> DEFINE script `script.py` SHIP (‘script.py’);

grunt> D = STREAM C THROUGH script AS (…);

41

Stream

Page 42: Hadoop pig

Support and a community of user-defined functions (UDFs)

UDFs can encapsulate users processing logic in filtering, comparison, evaluation, grouping, or storage

filter functions for instance are all subclasses of FilterFunc, which itself is a subclass of EvalFunc

PiggyBank: the Pig community sharing their UDFs

DataFu: Linkedin's collection of Pig UDFs

42

User defined functions

Page 43: Hadoop pig

package myudfs; import … public class UPPER extends EvalFunc<String> { public String exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; try{ String str = (String) input.get(0); return str.toUpperCase(); }catch(Exception e){ throw WrappedIOException.wrap("Caught exception processing input row ", e); } } }

43

A simple eval UDF example

Page 44: Hadoop pig

An Example

Let’s find the top 5 most visited pages by users aged 18 – 25. Input: user data file, and page view data file.

Load Users Load Pages

Filter by age

Join on name

Group on url

Count clicks

Order by clicks

Take top 5

44

Page 45: Hadoop pig

Users = LOAD ‘users’ AS (name, age); Filtered = FILTER Users BY age >= 18 and age <= 25; Pages = LOAD ‘pages’ AS (user, url); Joined = JOIN Filtered BY name, Pages by user; Grouped = GROUP Joined BY url; Summed = FOREACH Grouped GENERATE group, count(Joined) AS clicks; Sorted = ORDER Summed BY clicks desc; Top5 = LIMIT Sorted 5;

STORE Top5 INTO ‘top5sites’;

A simple script

45

Page 46: Hadoop pig

In MapReduce! i m p o r t j a v a . i o . I O E x c e p t i o n ;

i m p o r t j a v a . u t i l . A r r a y L i s t ;

i m p o r t j a v a . u t i l . I t e r a t o r ;

i m p o r t j a v a . u t i l . L i s t ;

i m p o r t o r g . a p a c h e . h a d o o p . f s . P a t h ;

i m p o r t o r g . a p a c h e . h a d o o p . i o . L o n g W r i t a b l e ;

i m p o r t o r g . a p a c h e . h a d o o p . i o . T e x t ;

i m p o r t o r g . a p a c h e . h a d o o p . i o . W r i t a b l e ;

im p o r t o r g . a p a c h e . h a d o o p . i o . W r i t a b l e C o m p a r a b l e ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . F i l e I n p u t F o r m a t ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . F i l e O u t p u t F o r m a t ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . J o b C o n f ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . K e y V a l u e T e x t I n p u t F o r m a t ;

i m p o r t o r g . ap a c h e . h a d o o p . m a p r e d . M a p p e r ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . M a p R e d u c e B a s e ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . O u t p u t C o l l e c t o r ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . R e c o r d R e a d e r ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . R e d u c e r ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . R e p o r t e r ;

i m po r t o r g . a p a c h e . h a d o o p . m a p r e d . S e q u e n c e F i l e I n p u t F o r m a t ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . S e q u e n c e F i l e O u t p u t F o r m a t ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . T e x t I n p u t F o r m a t ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . j o b c o n t r o l . J o b ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . j o b c o n t r o l . J o b Co n t r o l ;

i m p o r t o r g . a p a c h e . h a d o o p . m a p r e d . l i b . I d e n t i t y M a p p e r ;

p u b l i c c l a s s M R E x a m p l e {

p u b l i c s t a t i c c l a s s L o a d P a g e s e x t e n d s M a p R e d u c e B a s e

i m p l e m e n t s M a p p e r < L o n g W r i t a b l e , T e x t , T e x t , T e x t > {

p u b l i c v o i d m a p ( L o n g W r i t a b l e k , T e x t v a l ,

O u t p u t C o l l e c t o r < T e x t , T e x t > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

/ / P u l l t h e k e y o u t

S t r i n g l i n e = v a l . t o S t r i n g ( ) ;

i n t f i r s t C o m m a = l i n e . i n d e x O f ( ' , ' ) ;

S t r i n g k e y = l i n e . s u bs t r i n g ( 0 , f i r s t C o m m a ) ;

S t r i n g v a l u e = l i n e . s u b s t r i n g ( f i r s t C o m m a + 1 ) ;

T e x t o u t K e y = n e w T e x t ( k e y ) ;

/ / P r e p e n d a n i n d e x t o t h e v a l u e s o w e k n o w w h i c h f i l e

/ / i t c a m e f r o m .

T e x t o u t V a l = n e w T e x t ( " 1" + v a l u e ) ;

o c . c o l l e c t ( o u t K e y , o u t V a l ) ;

}

}

p u b l i c s t a t i c c l a s s L o a d A n d F i l t e r U s e r s e x t e n d s M a p R e d u c e B a s e

i m p l e m e n t s M a p p e r < L o n g W r i t a b l e , T e x t , T e x t , T e x t > {

p u b l i c v o i d m a p ( L o n g W r i t a b l e k , T e x t v a l ,

O u t p u t C o l l e c t o r < T e x t , T e x t > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

/ / P u l l t h e k e y o u t

S t r i n g l i n e = v a l . t o S t r i n g ( ) ;

i n t f i r s t C o m m a = l i n e . i n d e x O f ( ' , ' ) ;

S t r i n g v a l u e = l i n e . s u b s t r i n g (f i r s t C o m m a + 1 ) ;

i n t a g e = I n t e g e r . p a r s e I n t ( v a l u e ) ;

i f ( a g e < 1 8 | | a g e > 2 5 ) r e t u r n ;

S t r i n g k e y = l i n e . s u b s t r i n g ( 0 , f i r s t C o m m a ) ;

T e x t o u t K e y = n e w T e x t ( k e y ) ;

/ / P r e p e n d a n i n d e x t o t h e v a l u e s o we k n o w w h i c h f i l e

/ / i t c a m e f r o m .

T e x t o u t V a l = n e w T e x t ( " 2 " + v a l u e ) ;

o c . c o l l e c t ( o u t K e y , o u t V a l ) ;

}

}

p u b l i c s t a t i c c l a s s J o i n e x t e n d s M a p R e d u c e B a s e

i m p l e m e n t s R e d u c e r < T e x t , T e x t , T e x t , T e x t > {

p u b l i c v o i d r e d u c e ( T e x t k e y ,

I t e r a t o r < T e x t > i t e r ,

O u t p u t C o l l e c t o r < T e x t , T e x t > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

/ / F o r e a c h v a l u e , f i g u r e o u t w h i c h f i l e i t ' s f r o m a n d

s t o r e i t

/ / a c c o r d i n g l y .

L i s t < S t r i n g > f i r s t = n e w A r r a y L i s t < S t r i n g > ( ) ;

L i s t < S t r i n g > s e c o n d = n e w A r r a y L i s t < S t r i n g > ( ) ;

w h i l e ( i t e r . h a s N e x t ( ) ) {

T e x t t = i t e r . n e x t ( ) ;

S t r i n g v a l u e = t . t oS t r i n g ( ) ;

i f ( v a l u e . c h a r A t ( 0 ) = = ' 1 ' )

f i r s t . a d d ( v a l u e . s u b s t r i n g ( 1 ) ) ;

e l s e s e c o n d . a d d ( v a l u e . s u b s t r i n g ( 1 ) ) ;

r e p o r t e r . s e t S t a t u s ( " O K " ) ;

}

/ / D o t h e c r o s s p r o d u c t a n d c o l l e c t t h e v a l u e s

f o r ( S t r i n g s 1 : f i r s t ) {

f o r ( S t r i n g s 2 : s e c o n d ) {

S t r i n g o u t v a l = k e y + " , " + s 1 + " , " + s 2 ;

o c . c o l l e c t ( n u l l , n e w T e x t ( o u t v a l ) ) ;

r e p o r t e r . s e t S t a t u s ( " O K " ) ;

}

}

}

}

p u b l i c s t a t i c c l a s s L o a d J o i n e d e x t e n d s M a p R e d u c e B a s e

i m p l e m e n t s M a p p e r < T e x t , T e x t , T e x t , L o n g W r i t a b l e > {

p u b l i c v o i d m a p (

T e x t k ,

T e x t v a l ,

O u t p u t C o l l ec t o r < T e x t , L o n g W r i t a b l e > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

/ / F i n d t h e u r l

S t r i n g l i n e = v a l . t o S t r i n g ( ) ;

i n t f i r s t C o m m a = l i n e . i n d e x O f ( ' , ' ) ;

i n t s e c o n d C o m m a = l i n e . i n d e x O f ( ' , ' , f i r s tC o m m a ) ;

S t r i n g k e y = l i n e . s u b s t r i n g ( f i r s t C o m m a , s e c o n d C o m m a ) ;

/ / d r o p t h e r e s t o f t h e r e c o r d , I d o n ' t n e e d i t a n y m o r e ,

/ / j u s t p a s s a 1 f o r t h e c o m b i n e r / r e d u c e r t o s u m i n s t e a d .

T e x t o u t K e y = n e w T e x t ( k e y ) ;

o c . c o l l e c t ( o u t K e y , n e w L o n g W r i t a b l e ( 1 L ) ) ;

}

}

p u b l i c s t a t i c c l a s s R e d u c e U r l s e x t e n d s M a p R e d u c e B a s e

i m p l e m e n t s R e d u c e r < T e x t , L o n g W r i t a b l e , W r i t a b l e C o m p a r a b l e ,

W r i t a b l e > {

p u b l i c v o i d r e d u c e (

T e x t k ey ,

I t e r a t o r < L o n g W r i t a b l e > i t e r ,

O u t p u t C o l l e c t o r < W r i t a b l e C o m p a r a b l e , W r i t a b l e > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

/ / A d d u p a l l t h e v a l u e s w e s e e

l o n g s u m = 0 ;

w hi l e ( i t e r . h a s N e x t ( ) ) {

s u m + = i t e r . n e x t ( ) . g e t ( ) ;

r e p o r t e r . s e t S t a t u s ( " O K " ) ;

}

o c . c o l l e c t ( k e y , n e w L o n g W r i t a b l e ( s u m ) ) ;

}

}

p u b l i c s t a t i c c l a s s L o a d C l i c k s e x t e n d s M a p R e d u c e B a s e

im p l e m e n t s M a p p e r < W r i t a b l e C o m p a r a b l e , W r i t a b l e , L o n g W r i t a b l e ,

T e x t > {

p u b l i c v o i d m a p (

W r i t a b l e C o m p a r a b l e k e y ,

W r i t a b l e v a l ,

O u t p u t C o l l e c t o r < L o n g W r i t a b l e , T e x t > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

o c . c o l l e c t ( ( L o n g W r i t a b l e ) v a l , ( T e x t ) k e y ) ;

}

}

p u b l i c s t a t i c c l a s s L i m i t C l i c k s e x t e n d s M a p R e d u c e B a s e

i m p l e m e n t s R e d u c e r < L o n g W r i t a b l e , T e x t , L o n g W r i t a b l e , T e x t > {

i n t c o u n t = 0 ;

p u b l i c v o i d r e d u c e (

L o n g W r i t a b l e k e y ,

I t e r a t o r < T e x t > i t e r ,

O u t p u t C o l l e c t o r < L o n g W r i t a b l e , T e x t > o c ,

R e p o r t e r r e p o r t e r ) t h r o w s I O E x c e p t i o n {

/ / O n l y o u t p u t t h e f i r s t 1 0 0 r e c o r d s

w h i l e ( c o u n t < 1 0 0 & & i t e r . h a s N e x t ( ) ) {

o c . c o l l e c t ( k e y , i t e r . n e x t ( ) ) ;

c o u n t + + ;

}

}

}

p u b l i c s t a t i c v o i d m a i n ( S t r i n g [ ] a r g s ) t h r o w s I O E x c e p t i o n {

J o b C o n f l p = n e w J o b C o n f ( M R E x a m p l e . c l a s s ) ;

l p . s et J o b N a m e ( " L o a d P a g e s " ) ;

l p . s e t I n p u t F o r m a t ( T e x t I n p u t F o r m a t . c l a s s ) ;

l p . s e t O u t p u t K e y C l a s s ( T e x t . c l a s s ) ;

l p . s e t O u t p u t V a l u e C l a s s ( T e x t . c l a s s ) ;

l p . s e t M a p p e r C l a s s ( L o a d P a g e s . c l a s s ) ;

F i l e I n p u t F o r m a t . a d d I n p u t P a t h ( l p , n e w

P a t h ( " /u s e r / g a t e s / p a g e s " ) ) ;

F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( l p ,

n e w P a t h ( " / u s e r / g a t e s / t m p / i n d e x e d _ p a g e s " ) ) ;

l p . s e t N u m R e d u c e T a s k s ( 0 ) ;

J o b l o a d P a g e s = n e w J o b ( l p ) ;

J o b C o n f l f u = n e w J o b C o n f ( M R E x a m p l e . c l a s s ) ;

l f u . se t J o b N a m e ( " L o a d a n d F i l t e r U s e r s " ) ;

l f u . s e t I n p u t F o r m a t ( T e x t I n p u t F o r m a t . c l a s s ) ;

l f u . s e t O u t p u t K e y C l a s s ( T e x t . c l a s s ) ;

l f u . s e t O u t p u t V a l u e C l a s s ( T e x t . c l a s s ) ;

l f u . s e t M a p p e r C l a s s ( L o a d A n d F i l t e r U s e r s . c l a s s ) ;

F i l e I n p u t F o r m a t . a d dI n p u t P a t h ( l f u , n e w

P a t h ( " / u s e r / g a t e s / u s e r s " ) ) ;

F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( l f u ,

n e w P a t h ( " / u s e r / g a t e s / t m p / f i l t e r e d _ u s e r s " ) ) ;

l f u . s e t N u m R e d u c e T a s k s ( 0 ) ;

J o b l o a d U s e r s = n e w J o b ( l f u ) ;

J o b C o n f j o i n = n e w J o b C o n f (M R E x a m p l e . c l a s s ) ;

j o i n . s e t J o b N a m e ( " J o i n U s e r s a n d P a g e s " ) ;

j o i n . s e t I n p u t F o r m a t ( K e y V a l u e T e x t I n p u t F o r m a t . c l a s s ) ;

j o i n . s e t O u t p u t K e y C l a s s ( T e x t . c l a s s ) ;

j o i n . s e t O u t p u t V a l u e C l a s s ( T e x t . c l a s s ) ;

j o i n . s e t M a p p e r C l a s s ( I d e n t i t y M a pp e r . c l a s s ) ;

j o i n . s e t R e d u c e r C l a s s ( J o i n . c l a s s ) ;

F i l e I n p u t F o r m a t . a d d I n p u t P a t h ( j o i n , n e w

P a t h ( " / u s e r / g a t e s / t m p / i n d e x e d _ p a g e s " ) ) ;

F i l e I n p u t F o r m a t . a d d I n p u t P a t h ( j o i n , n e w

P a t h ( " / u s e r / g a t e s / t m p / f i l t e r e d _ u s e r s " ) ) ;

F i l e O u t p u t F o r m a t . s et O u t p u t P a t h ( j o i n , n e w

P a t h ( " / u s e r / g a t e s / t m p / j o i n e d " ) ) ;

j o i n . s e t N u m R e d u c e T a s k s ( 5 0 ) ;

J o b j o i n J o b = n e w J o b ( j o i n ) ;

j o i n J o b . a d d D e p e n d i n g J o b ( l o a d P a g e s ) ;

j o i n J o b . a d d D e p e n d i n g J o b ( l o a d U s e r s ) ;

J o b C o n f g r o u p = n e w J o b C o n f ( M R Ex a m p l e . c l a s s ) ;

g r o u p . s e t J o b N a m e ( " G r o u p U R L s " ) ;

g r o u p . s e t I n p u t F o r m a t ( K e y V a l u e T e x t I n p u t F o r m a t . c l a s s ) ;

g r o u p . s e t O u t p u t K e y C l a s s ( T e x t . c l a s s ) ;

g r o u p . s e t O u t p u t V a l u e C l a s s ( L o n g W r i t a b l e . c l a s s ) ;

g r o u p . s e t O u t p u t F o r m a t ( S e q u e n c e F il e O u t p u t F o r m a t . c l a s s ) ;

g r o u p . s e t M a p p e r C l a s s ( L o a d J o i n e d . c l a s s ) ;

g r o u p . s e t C o m b i n e r C l a s s ( R e d u c e U r l s . c l a s s ) ;

g r o u p . s e t R e d u c e r C l a s s ( R e d u c e U r l s . c l a s s ) ;

F i l e I n p u t F o r m a t . a d d I n p u t P a t h ( g r o u p , n e w

P a t h ( " / u s e r / g a t e s / t m p / j o i n e d " ) ) ;

F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( g r o u p , n e w

P a t h ( " / u s e r / g a t e s / t m p / g r o u p e d " ) ) ;

g r o u p . s e t N u m R e d u c e T a s k s ( 5 0 ) ;

J o b g r o u p J o b = n e w J o b ( g r o u p ) ;

g r o u p J o b . a d d D e p e n d i n g J o b ( j o i n J o b ) ;

J o b C o n f t o p 1 0 0 = n e w J o b C o n f ( M R E x a m p l e . c l a s s ) ;

t o p 1 0 0 . s e t J o b N a m e ( " T o p 1 0 0 s i t e s " ) ;

t o p 1 0 0 . s e t I n p u t F o r m a t ( S e q u e n c e F i l e I n p u t F o r m a t . c l a s s ) ;

t o p 1 0 0 . s e t O u t p u t K e y C l a s s ( L o n g W r i t a b l e . c l a s s ) ;

t o p 1 0 0 . s e t O u t p u t V a l u e C l a s s ( T e x t . c l a s s ) ;

t o p 1 0 0 . s e t O u t p u t F o r m a t ( S e q u e n c e F i l e O u t p u t Fo r m a t . c l a s s ) ;

t o p 1 0 0 . s e t M a p p e r C l a s s ( L o a d C l i c k s . c l a s s ) ;

t o p 1 0 0 . s e t C o m b i n e r C l a s s ( L i m i t C l i c k s . c l a s s ) ;

t o p 1 0 0 . s e t R e d u c e r C l a s s ( L i m i t C l i c k s . c l a s s ) ;

F i l e I n p u t F o r m a t . a d d I n p u t P a t h ( t o p 1 0 0 , n e w

P a t h ( " / u s e r / g a t e s / t m p / g r o u p e d " ) ) ;

F i l e O u t p u t F o r m a t . s e t O u t p u t P a t h ( t o p 1 0 0 , n e w

P a t h ( " / u s e r / g a t e s / t o p 1 0 0 s i t e s f o r u s e r s 1 8 t o 2 5 " ) ) ;

t o p 1 0 0 . s e t N u m R e d u c e T a s k s ( 1 ) ;

J o b l i m i t = n e w J o b ( t o p 1 0 0 ) ;

l i m i t . a d d D e p e n d i n g J o b ( g r o u p J o b ) ;

J o b C o n t r o l j c = n e w J o b C o n t r o l ( " F i n d t o p 1 0 0 s i t e s f o r u s e r s

1 8 t o 2 5 " ) ;

j c . a d d J o b ( l o a d P a g e s ) ;

j c . a d d J o b ( l o a d U s e r s ) ;

j c . a d d J o b ( j o i n J o b ) ;

j c . a d d J o b ( g r o u p J o b ) ;

j c . a d d J o b ( l i m i t ) ;

j c . r u n ( ) ;

}

}

46

Page 47: Hadoop pig

Ease of Translation

Users = LOAD … Filtered = FILTER … Pages = LOAD … Joined = JOIN … Grouped = GROUP … Summed = … COUNT()… Sorted = ORDER … Top5 = LIMIT …

Load Users Load Pages

Filter by age

Join on name

Group on url

Count clicks

Order by clicks

Take top 5

47

Page 48: Hadoop pig

Cassandra has gained some significant integration points with Hadoop and its analytics tools

In order to achieve Hadoop’s data locality, Cassandra nodes must be part of the Hadoop cluster by running a tasktracker process. So the namenode and jobtracker can reside outside of the Cassandra cluster

48

The Hadoop/Pig/Cassandra stack

A three- node Cassandra/Hadoop cluster with external namenode / jobtracker

Page 49: Hadoop pig

Cassandra has a Java source package for Hadoop integration org.apache.cassandra.hadoop

ColumnFamilyInputFormat extends InputFormat

ColumnFamilyOutputFormat extends OutputFormat

ConfigHelper a helper class to configure Cassandra-specific information

Hadoop output streaming was introduced in 0.7 but removed from 0.8

49

Hadoop jobs

Page 50: Hadoop pig

The Pig integration CassandraStorage() (a LoadFunc implementation) allows Pig to Load/Store data from/to Cassandra grunt> LOAD 'cassandra://Keyspace/cf' USING CassandraStorage();

The pig_cassandra script, shipped with Cassandra source, performs the necessary initialisation (Pig environments variables still needs to be set)

Pygmalion is a set of scripts and UDFs to facilitate the use of Pig alongside Cassandra

50

Pig alongside Cassandra

Page 51: Hadoop pig

A workflow system provides an infrastructure to set up & manage a sequence of interdependent jobs / set of jobs

The hadoop ecosystem includes a set of workflow tools to run applications over MapReduce processes or High-level languages

Cascading (http://www.cascading.org/). A java library defining data processing workflows and rendering them to MapReduce jobs

Oozie (http://yahoo.github.com/oozie/)

51

Workflow

Page 52: Hadoop pig

Some links

http://hadoop.apache.org

http://pig.apache.org/

https://cwiki.apache.org/confluence/display/PIG/Index

PiggyBank: https://cwiki.apache.org/confluence/display/PIG/PiggyBank

DataFu: https://github.com/linkedin/datafu

Pygmalion: https://github.com/jeromatron/pygmalion

http://code.google.com/edu/parallel/mapreduce-tutorial.html

Video tutorials from Cloudera: http://www.cloudera.com/hadoop-training

Interesting papers: http://bit.ly/rskJho - Original MapReduce paper

http://bit.ly/KvFXxT - Pig paper: ‘Building a High-Level Dataflow System on top of MapReduce: The Pig Experience’

52

Page 53: Hadoop pig

53

A simple data flow

Load checkins data

Keep only the two ids

Group by user/loc id & Order

Limit to top 50

Top 50 users / locations [same script, different group key]

Page 54: Hadoop pig

54

Another data flow

Load checkins data

Split_date

Group by date

Count the tuples Group by weeks using

Stream

All the checkins, over weeks