using ruby to do map/reduce with hadoop

33
5/12/11 James Kebinger

Upload: james-kebinger

Post on 12-Dec-2014

9.785 views

Category:

Technology


1 download

DESCRIPTION

Ruby user group presentation introducing map/reduce and how to use ruby to process data with map/reduce on hadoop clusters.

TRANSCRIPT

Page 1: Using Ruby to do Map/Reduce with Hadoop

5/12/11 James Kebinger

Page 2: Using Ruby to do Map/Reduce with Hadoop

Agenda   Introduction/Who am I?

  Map/Reduce basics

  Wukong

  Hadoop and Amazon Elastic Map Reduce

  Real Examples

  Pig, Other Tools

5/12/11 James Kebinger

Page 3: Using Ruby to do Map/Reduce with Hadoop

Introduction   James Kebinger

  Software Engineer/Data Analyst

  Data Team at PatientsLikeMe

  Ruby, SQL, R, Hadoop/Pig

[email protected]

  @monkeyatlarge on twitter

  Blogs at Monkeyatlarge.com

5/12/11 James Kebinger

Page 4: Using Ruby to do Map/Reduce with Hadoop

Big Data?   Flexibility is key

  Keep the whole haystack, figure out the needles later

  Don’t need to plan what fields to keep ahead of time

  Store everything you can afford to – on cheap storage

  Be able to get answers before you forget the question

5/12/11 James Kebinger

Page 5: Using Ruby to do Map/Reduce with Hadoop

Examples   Extract session summaries from archived weblogs

  Match patients to treatment centers en-masse to get an estimate of the distribution

5/12/11 James Kebinger

Page 6: Using Ruby to do Map/Reduce with Hadoop

Map/Reduce Ruby Style

nums = [1,2,4,5,6]

by_2 = nums.map {|num| number*2}

sums_by_2 = by_2.reduce {|memo, obj| memo + obj}

5/12/11 James Kebinger

Page 7: Using Ruby to do Map/Reduce with Hadoop

Map/Reduce

Monkey 1 Zebra 1 Dog 1 Cat 1 Zebra 1 Dog 1

Map

Monkey foo bar zebra dog Cat dog baz foo

5/12/11 James Kebinger

Sort

  Mapper transforms or filters input

  Emits 0 to n outputs for each line/record input

  Each output starts with the Key, followed by 0 or more values

Page 8: Using Ruby to do Map/Reduce with Hadoop

Map/Reduce

STOP_WORDS = %w{foo bar baz} ARGF.each_line do |line| line.split(/\s+/).each do |word| puts [word,1].join("\t") unless STOP_WORDS.include? word end end

Monkey 1 Zebra 1 Dog 1 Cat 1 Zebra 1 Dog 1

Map

Monkey foo bar zebra dog Cat dog baz foo

5/12/11 James Kebinger

Sort

Page 9: Using Ruby to do Map/Reduce with Hadoop

Map/Reduce

Cat 1 Dog 1 Dog 1 Monkey 1 Zebra 1 Zebra 1

Sort

Cat 1 Dog 2 Monkey 1 Zebra 2

Reduce

5/12/11 James Kebinger

  Reducer receives input sorted by key(s)

  Applies an operation to all data with the same key

  Emits the result

Page 10: Using Ruby to do Map/Reduce with Hadoop

Map/Reduce last_word = nil; current_count = 0 ARGF.each do |line| word, count = line.split("\t") if word != last_word puts [last_word, current_count].join("\t") unless last_word.nil? last_word = word current_count = 0 end current_count += count.to_i end

puts [last_word, current_count].join("\t") unless last_word.nil?

Cat 1 Dog 1 Dog 1 Monkey 1 Zebra 1 Zebra 1

Sort

Cat 1 Dog 2 Monkey 1 Zebra 2

Reduce

5/12/11 James Kebinger

Page 11: Using Ruby to do Map/Reduce with Hadoop

With Unix Pipes cat SOMEFILE | \ ruby wordcount-simple-mapper.rb | \ sort | \ ruby wordcount-simple-reducer.rb

5/12/11 James Kebinger

Page 12: Using Ruby to do Map/Reduce with Hadoop

Enter Wukong   Project out of InfoChimps

  Deals with much of the minutiae

  Base classes for mappers and reducers

  Lets your script deal with lines, records or objects as needed

  Nice logging showing data flow

  Support for launching locally, Amazon EMR or your own Hadoop cluster

  https://github.com/mrflip/wukong

5/12/11 James Kebinger

Page 13: Using Ruby to do Map/Reduce with Hadoop

Wukong Word Count

5/12/11 James Kebinger

module WordCount class Mapper < Wukong::Streamer::LineStreamer def process line words = line.strip.split(/\W+/).reject(&:blank?) words.each{|word| yield [word, 1] } end end

class Reducer < Wukong::Streamer::ListReducer def finalize yield [ key, values.map(&:last).map(&:to_i).sum ] end end end

Wukong::Script.new( WordCount::Mapper, WordCount::Reducer).run

Page 14: Using Ruby to do Map/Reduce with Hadoop

Data Accumulation

5/12/11 James Kebinger

class Reducer < Wukong::Streamer::AccumulatingReducer def start! word, count @word = word; @count = 0 end

def accumulate word, count @count += count.to_i end

def finalize yield [ @word, @count ] end end

Page 15: Using Ruby to do Map/Reduce with Hadoop

Super Size Me

5/12/11 James Kebinger

Page 16: Using Ruby to do Map/Reduce with Hadoop

Hadoop   Hadoop is the Java implementation of Map/Reduce

  Industrial scale

  Tools to manage clusters of machines

  Distributed File System

  Move data around

  Move your scripts to the data

  Used to need Java

  Hadoop Streaming allows data to be piped through scripts in any language

  Map-reduce becoming a little bit like assembly

5/12/11 James Kebinger

Page 17: Using Ruby to do Map/Reduce with Hadoop

Hadoop Data Flow

5/12/11 James Kebinger

Page 18: Using Ruby to do Map/Reduce with Hadoop

Amazon EMR   Amazon Elastic MapReduce

  Stick your data and code in S3

  Pick how many machines and how big

  Light up a cluster, process data and shut it down

5/12/11 James Kebinger

Page 19: Using Ruby to do Map/Reduce with Hadoop

Same Code, Two Speeds

  Changing the run argument from local* to

  “hadoop” to run on a hadoop cluster

  “emr” to run on Amazon Elastic Map Reduce

5/12/11 James Kebinger

ruby wukong_test.rb --run=local input_file output

* And a few more arguments for EMR

ruby wukong_test.rb --run=emr --key_pair=jamesk-mbp --emr_root=s3://jkk-plm –instance-type=c1.medium –num-instances=4 s3://jkk-plm/testing/input/* s3://jkk-plm/testing/output

Page 20: Using Ruby to do Map/Reduce with Hadoop

For Comparison

elastic-mapreduce \ --create --name=myjob \ --key-pair=jamesk-mbp \ --slave-instance-type=c1.medium --num-instances 3 \ --bootstrap-action=s3://MYBUCKET/emr_bootstrap.sh \ --credentials LOCAL_PATH_TO/credentials.json \ --log-uri=s3://MYBUCKET/logs \ --stream \ --cache s3n://MYBUCKET/code/my_mapper.rb \ --cache s3n://MYBUCKET/code/my_reducer.rb --mapper='/usr/bin/ruby1.8 my_mapper.rb\ --reducer='/usr/bin/ruby1.8 my_reducer.rb \ --input=s3n://MYBUCKET/input/*.bz2 \ --output=s3n://MYBUCKET

5/12/11 James Kebinger

Page 21: Using Ruby to do Map/Reduce with Hadoop

Closest Location (map) class Mapper < Wukong::Streamer::RecordStreamer def process person_id, person_lat, person_lon, location_id, location_lat, location_lon distance = haversine_distance(person_lat.to_f, person_lon.to_f, location_lat.to_f, location_lon.to_f) yield [person_id, distance, location_id] if distance < 3000 end end

5/12/11 James Kebinger

Page 22: Using Ruby to do Map/Reduce with Hadoop

Closest Location (reduce) class Reducer < Wukong::Streamer::AccumulatingReducer def start! person_id, distance, location_id @person_id = person_id @closest_distance = distance @closest_location_id = location_id end def accumulate person_id, distance, location_id if @closest_distance > distance @closest_distance = distance @closest_location_id = location_id end end def finalize yield [@person_id, @closest_location_id, @closest_distance] end end

5/12/11 James Kebinger

Page 23: Using Ruby to do Map/Reduce with Hadoop

Secondary Sort   Data is sorted before reduce phase by

key, then partitioned

  Take advantage of that to get sorted input to each reducer

  Split and sort by different keys using a couple of arguments

  Add args to launch

  num.key.fields.for.partition=1

  stream.num.map.output.key.fields=2

  partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

A12 45.6 B12 A12 3.4 B13 A12 34.3 B10 A27 20.1 B11 A99 23.0 B10 A99 20.1 B11

Sort and partition

A12 3.4 B13 A12 34.3 B10 A12 45.6 B12 A27 20.1 B11 A99 20.1 B11 A99 23.0 B10

Partition

Sort 5/12/11 James Kebinger

Page 24: Using Ruby to do Map/Reduce with Hadoop

Session Splitting Example   Get smarter later

  How long do logged-in users of a certain class spend on the site?

  Scan weblogs in time order for each user, splitting when clicks are at least N minutes apart

  With a little Ruby and a credit card, can know the answer in a few hours

5/12/11 James Kebinger

Page 25: Using Ruby to do Map/Reduce with Hadoop

class Reducer < Wukong::Streamer::AccumulatingReducer GAP = 0.5 #hour attr_accessor :sessions def start! *args @sessions = [] end def accumulate user_id, date_str, page_name date = DateTime.parse(date_str) curr_session = sessions.last if curr_session.nil? || (date - curr_session[:end])*24 > GAP curr_session = {:start => date, :count => 0, :pages => Hash.new(0)} sessions << curr_session end curr_session[:end] = date curr_session[:count] += 1 curr_session[:pages][page_name] += 1 end def finalize sessions.each do |s| yield [key, s[:start].s[:end] …. end end end

James Kebinger

Page 26: Using Ruby to do Map/Reduce with Hadoop

Gotchas   Mappers run one per file – so upload data in chunks

  Except LZO, which need index. Bzip splitting around the corner?

  Getting dependencies onto all the machines

  Bootstrap scripts

  Package files into the cache, will be symlinked into the working directory

  If your job crashes, you pay for the whole hour

5/12/11 James Kebinger

Page 27: Using Ruby to do Map/Reduce with Hadoop

Bootstrap Script

5/12/11 James Kebinger

sudo apt-get update sudo apt-get -y install ruby1.8-dev wget https://jkk-plm.s3.amazonaws.com/bootstrap-deps/rubygems-1.7.2.tgz tar xvfz rubygems-1.7.2.tgz cd rubygems-1.7.2 && sudo ruby setup.rb sudo gem install wukong json --no-rdoc --no-ri

Page 28: Using Ruby to do Map/Reduce with Hadoop

Orchestration   Examples still at the level of map-reduce

  Sophisticated workflows have many maps and reduces

  Growing selection of tools to move up a level of abstraction

  Write a data flow, compile it down into a series of maps and reduces

5/12/11 James Kebinger

Page 29: Using Ruby to do Map/Reduce with Hadoop

Apache Pig   Data types

  Int/Float/Date/String

  Tuple

  Bag

  High level operations to join, filter, sort and group data

  Min/Max/Count

  And you can still call out to scripting languages…

  Handy tool even running on one machine

5/12/11 James Kebinger

Page 30: Using Ruby to do Map/Reduce with Hadoop

Haversine Distance, Pig

5/12/11 James Kebinger

Page 31: Using Ruby to do Map/Reduce with Hadoop

Other High Level Tools   Hive: Like SQL

  Cascading: Pipe metaphor. Java, but works with Jruby

5/12/11 James Kebinger

Page 32: Using Ruby to do Map/Reduce with Hadoop

Further Reading   Data Recipes Blog thedatachef.blogspot.com

  Other tools for Ruby, Python

  MRTool

  List on wukong github

5/12/11 James Kebinger

Page 33: Using Ruby to do Map/Reduce with Hadoop

Thank You   James Kebinger

[email protected]

5/12/11 James Kebinger