using ruby to do map/reduce with hadoop
DESCRIPTION
Ruby user group presentation introducing map/reduce and how to use ruby to process data with map/reduce on hadoop clusters.TRANSCRIPT
5/12/11 James Kebinger
Agenda Introduction/Who am I?
Map/Reduce basics
Wukong
Hadoop and Amazon Elastic Map Reduce
Real Examples
Pig, Other Tools
5/12/11 James Kebinger
Introduction James Kebinger
Software Engineer/Data Analyst
Data Team at PatientsLikeMe
Ruby, SQL, R, Hadoop/Pig
@monkeyatlarge on twitter
Blogs at Monkeyatlarge.com
5/12/11 James Kebinger
Big Data? Flexibility is key
Keep the whole haystack, figure out the needles later
Don’t need to plan what fields to keep ahead of time
Store everything you can afford to – on cheap storage
Be able to get answers before you forget the question
5/12/11 James Kebinger
Examples Extract session summaries from archived weblogs
Match patients to treatment centers en-masse to get an estimate of the distribution
5/12/11 James Kebinger
Map/Reduce Ruby Style
nums = [1,2,4,5,6]
by_2 = nums.map {|num| number*2}
sums_by_2 = by_2.reduce {|memo, obj| memo + obj}
5/12/11 James Kebinger
Map/Reduce
Monkey 1 Zebra 1 Dog 1 Cat 1 Zebra 1 Dog 1
Map
Monkey foo bar zebra dog Cat dog baz foo
5/12/11 James Kebinger
Sort
Mapper transforms or filters input
Emits 0 to n outputs for each line/record input
Each output starts with the Key, followed by 0 or more values
Map/Reduce
STOP_WORDS = %w{foo bar baz} ARGF.each_line do |line| line.split(/\s+/).each do |word| puts [word,1].join("\t") unless STOP_WORDS.include? word end end
Monkey 1 Zebra 1 Dog 1 Cat 1 Zebra 1 Dog 1
Map
Monkey foo bar zebra dog Cat dog baz foo
5/12/11 James Kebinger
Sort
Map/Reduce
Cat 1 Dog 1 Dog 1 Monkey 1 Zebra 1 Zebra 1
Sort
Cat 1 Dog 2 Monkey 1 Zebra 2
Reduce
5/12/11 James Kebinger
Reducer receives input sorted by key(s)
Applies an operation to all data with the same key
Emits the result
Map/Reduce last_word = nil; current_count = 0 ARGF.each do |line| word, count = line.split("\t") if word != last_word puts [last_word, current_count].join("\t") unless last_word.nil? last_word = word current_count = 0 end current_count += count.to_i end
puts [last_word, current_count].join("\t") unless last_word.nil?
Cat 1 Dog 1 Dog 1 Monkey 1 Zebra 1 Zebra 1
Sort
Cat 1 Dog 2 Monkey 1 Zebra 2
Reduce
5/12/11 James Kebinger
With Unix Pipes cat SOMEFILE | \ ruby wordcount-simple-mapper.rb | \ sort | \ ruby wordcount-simple-reducer.rb
5/12/11 James Kebinger
Enter Wukong Project out of InfoChimps
Deals with much of the minutiae
Base classes for mappers and reducers
Lets your script deal with lines, records or objects as needed
Nice logging showing data flow
Support for launching locally, Amazon EMR or your own Hadoop cluster
https://github.com/mrflip/wukong
5/12/11 James Kebinger
Wukong Word Count
5/12/11 James Kebinger
module WordCount class Mapper < Wukong::Streamer::LineStreamer def process line words = line.strip.split(/\W+/).reject(&:blank?) words.each{|word| yield [word, 1] } end end
class Reducer < Wukong::Streamer::ListReducer def finalize yield [ key, values.map(&:last).map(&:to_i).sum ] end end end
Wukong::Script.new( WordCount::Mapper, WordCount::Reducer).run
Data Accumulation
5/12/11 James Kebinger
class Reducer < Wukong::Streamer::AccumulatingReducer def start! word, count @word = word; @count = 0 end
def accumulate word, count @count += count.to_i end
def finalize yield [ @word, @count ] end end
Super Size Me
5/12/11 James Kebinger
Hadoop Hadoop is the Java implementation of Map/Reduce
Industrial scale
Tools to manage clusters of machines
Distributed File System
Move data around
Move your scripts to the data
Used to need Java
Hadoop Streaming allows data to be piped through scripts in any language
Map-reduce becoming a little bit like assembly
5/12/11 James Kebinger
Hadoop Data Flow
5/12/11 James Kebinger
Amazon EMR Amazon Elastic MapReduce
Stick your data and code in S3
Pick how many machines and how big
Light up a cluster, process data and shut it down
5/12/11 James Kebinger
Same Code, Two Speeds
Changing the run argument from local* to
“hadoop” to run on a hadoop cluster
“emr” to run on Amazon Elastic Map Reduce
5/12/11 James Kebinger
ruby wukong_test.rb --run=local input_file output
* And a few more arguments for EMR
ruby wukong_test.rb --run=emr --key_pair=jamesk-mbp --emr_root=s3://jkk-plm –instance-type=c1.medium –num-instances=4 s3://jkk-plm/testing/input/* s3://jkk-plm/testing/output
For Comparison
elastic-mapreduce \ --create --name=myjob \ --key-pair=jamesk-mbp \ --slave-instance-type=c1.medium --num-instances 3 \ --bootstrap-action=s3://MYBUCKET/emr_bootstrap.sh \ --credentials LOCAL_PATH_TO/credentials.json \ --log-uri=s3://MYBUCKET/logs \ --stream \ --cache s3n://MYBUCKET/code/my_mapper.rb \ --cache s3n://MYBUCKET/code/my_reducer.rb --mapper='/usr/bin/ruby1.8 my_mapper.rb\ --reducer='/usr/bin/ruby1.8 my_reducer.rb \ --input=s3n://MYBUCKET/input/*.bz2 \ --output=s3n://MYBUCKET
5/12/11 James Kebinger
Closest Location (map) class Mapper < Wukong::Streamer::RecordStreamer def process person_id, person_lat, person_lon, location_id, location_lat, location_lon distance = haversine_distance(person_lat.to_f, person_lon.to_f, location_lat.to_f, location_lon.to_f) yield [person_id, distance, location_id] if distance < 3000 end end
5/12/11 James Kebinger
Closest Location (reduce) class Reducer < Wukong::Streamer::AccumulatingReducer def start! person_id, distance, location_id @person_id = person_id @closest_distance = distance @closest_location_id = location_id end def accumulate person_id, distance, location_id if @closest_distance > distance @closest_distance = distance @closest_location_id = location_id end end def finalize yield [@person_id, @closest_location_id, @closest_distance] end end
5/12/11 James Kebinger
Secondary Sort Data is sorted before reduce phase by
key, then partitioned
Take advantage of that to get sorted input to each reducer
Split and sort by different keys using a couple of arguments
Add args to launch
num.key.fields.for.partition=1
stream.num.map.output.key.fields=2
partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
A12 45.6 B12 A12 3.4 B13 A12 34.3 B10 A27 20.1 B11 A99 23.0 B10 A99 20.1 B11
Sort and partition
A12 3.4 B13 A12 34.3 B10 A12 45.6 B12 A27 20.1 B11 A99 20.1 B11 A99 23.0 B10
Partition
Sort 5/12/11 James Kebinger
Session Splitting Example Get smarter later
How long do logged-in users of a certain class spend on the site?
Scan weblogs in time order for each user, splitting when clicks are at least N minutes apart
With a little Ruby and a credit card, can know the answer in a few hours
5/12/11 James Kebinger
class Reducer < Wukong::Streamer::AccumulatingReducer GAP = 0.5 #hour attr_accessor :sessions def start! *args @sessions = [] end def accumulate user_id, date_str, page_name date = DateTime.parse(date_str) curr_session = sessions.last if curr_session.nil? || (date - curr_session[:end])*24 > GAP curr_session = {:start => date, :count => 0, :pages => Hash.new(0)} sessions << curr_session end curr_session[:end] = date curr_session[:count] += 1 curr_session[:pages][page_name] += 1 end def finalize sessions.each do |s| yield [key, s[:start].s[:end] …. end end end
James Kebinger
Gotchas Mappers run one per file – so upload data in chunks
Except LZO, which need index. Bzip splitting around the corner?
Getting dependencies onto all the machines
Bootstrap scripts
Package files into the cache, will be symlinked into the working directory
If your job crashes, you pay for the whole hour
5/12/11 James Kebinger
Bootstrap Script
5/12/11 James Kebinger
sudo apt-get update sudo apt-get -y install ruby1.8-dev wget https://jkk-plm.s3.amazonaws.com/bootstrap-deps/rubygems-1.7.2.tgz tar xvfz rubygems-1.7.2.tgz cd rubygems-1.7.2 && sudo ruby setup.rb sudo gem install wukong json --no-rdoc --no-ri
Orchestration Examples still at the level of map-reduce
Sophisticated workflows have many maps and reduces
Growing selection of tools to move up a level of abstraction
Write a data flow, compile it down into a series of maps and reduces
5/12/11 James Kebinger
Apache Pig Data types
Int/Float/Date/String
Tuple
Bag
High level operations to join, filter, sort and group data
Min/Max/Count
And you can still call out to scripting languages…
Handy tool even running on one machine
5/12/11 James Kebinger
Haversine Distance, Pig
5/12/11 James Kebinger
Other High Level Tools Hive: Like SQL
Cascading: Pipe metaphor. Java, but works with Jruby
5/12/11 James Kebinger
Further Reading Data Recipes Blog thedatachef.blogspot.com
Other tools for Ruby, Python
MRTool
List on wukong github
5/12/11 James Kebinger