hadoop and beyond: power tools for data mining
DESCRIPTION
A brief survey of great tools for dealing with big datasets. Given as an invited lecture for students taking the Cloud Computing module at Birkbeck and UCL.TRANSCRIPT
Hadoop and beyond:power tools for data mining
Mark Levy, 13 March 2013Cloud Computing Module
Birkbeck/UCL
Hadoop and beyondOutline:
• the data I work with• Hadoop without Java• Map-Reduce unfriendly algorithms• Hadoop without Map-Reduce• alternatives in the cloud• alternatives on your laptop
NB• all software mentioned is Open Source• won't cover key-value stores• I don't use all of these tools
Last.fm: scrobbling
Last.fm: scrobbling
Last.fm: tagging
Last.fm: personalised radio
Last.fm: recommendations
Last.fm: recommendations
Last.fm datasetsCore datasets:
• 45M users, many active• 60M artists• 100M audio fingerprints• 600M tracks (hmm...)• 19M physical recordings• 3M distinct tags• 2.5M <user,item,tag> taggings per month• 1B <user,time,track> scrobbles per month• full user-track graph has ~50B edges
(more often work with ~500M edges)
Problem Scenario 1Need Hadoop, don't want Java:
• need to build prototypes, fast• need to do interactive data analysis• want terse, highly readable code
• improve maintainability• improve correctness
Hadoop without Java Some options:• Hive (Yahoo!)• Pig (Yahoo!)• Cascading (ok it's still Java...)• Scalding (Twitter)• Hadoop streaming (various)
not to mention 11 more listed here: http://blog.matthewrathbone.com/2013/01/05/a-quick-guide-to-hadoop-map-reduce-frameworks.html
not to mention 11 more listed here: http://blog.matthewrathbone.com/2013/01/05/a-quick-guide-to-hadoop-map-reduce-frameworks.html
Apache HiveSQL access to data on Hadooppros:
• minimal learning curve• interactive shell• easy to check correctness of code
cons:• can be inefficient• hard to fix when it is
Word count in HiveCREATE TABLE input (line STRING);LOAD DATA LOCAL INPATH '/input' OVERWRITE INTO TABLE input;
SELECT word, COUNT(*) FROM input LATERAL VIEW explode(split(text, ' ')) wTable as word GROUP BY word;
[but would you use SQL to count words?]
Apache PigHigh level scripting language for Hadooppros:
• more primitive operations than Hive (and UDFs)• more flexible than Hive• interactive shell
cons:• harder learning curve than Hive • tempting to write longer programs but no code
modularity beyond functions
Word count in PigA = load '/input';B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;C = filter B by word matches '\\w+';D = group C by word;E = foreach D generate COUNT(C), group;store E into '/output/wordcount';
[ apply operations to "relations" (tuples) ]
CascadingJava data pipelining for Hadooppros:
• as flexible as Pig• uses a real programming langauge• ideal for longer workflows
cons:• new concepts to learn ("spout","sink","tap",...) • still verbose (full wordcount ex. code > 150 lines)
Word count in CascadingScheme sourceScheme = new TextLine(new Fields("line"));Tap source = new Hfs(sourceScheme, "/input");
Scheme sinkScheme = new TextLine(new Fields("word", "count"));Tap sink = new Hfs(sinkScheme, "/output/wordcount", SinkMode.REPLACE);
Pipe assembly = new Pipe("wordcount");String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";Function function = new RegexGenerator(new Fields("word"), regex);assembly = new Each(assembly, new Fields("line"), function);assembly = new GroupBy(assembly, new Fields("word"));Aggregator count = new Count(new Fields("count"));assembly = new Every(assembly, count);
Properties properties = new Properties();FlowConnector.setApplicationJarClass(properties, Main.class);FlowConnector flowConnector = new FlowConnector(properties);Flow flow = flowConnector.connect("word-count", source, sink, assembly);flow.complete();
ScaldingScala data pipelining for Hadooppros:
• as flexible as Pig• uses a real programming language• much terser than Java
cons:• community still small (but in use at Twitter)• ???
Word count in Scaldingimport com.twitter.scalding._
class WordCountJob(args : Args) extends Job(args) { TextLine(args("input")) .flatMap('line -> 'word){ line: String => line.split("""\s+""") } .groupBy('word){ _.size } .write(Tsv(args("output")))}
[and a one-liner to run it]
Hadoop streamingMap-reduce in any languagee.g. Dumbo wrapper for Pythonpros:
• use your favourite language for map-reduce• easy to mix local and cloud processing
cons:• limited community• limited functionality beyond map-reduce
Word count in Dumbodef map(key,text):
for word in text.split():yield word,1 # ignore key
def reduce(word,counts):yield word,sum(counts)
import dumbodumbo.run(map,reduce,combiner=reduce)
[and a one-liner to run it]
Problem Scenario 1bNeed Hadoop, don't want Java:
• drive native code in parallelE.g. audio analysis for:
• beat locations, bpm• key estimation• chord sequence estimation• energy• music/speech?• ...
Audio AnalysisProblem:• millions of audio tracks on own dfs• long-running C++ analysis code• depends on numerous libraries• verbose output
Audio AnalysisSolution:• bash + Dumbo Hadoop streaming
Outline:• build C++ code• zip up binary and libs• send zipfile and some track IDs to each machine• extract and run binary in map task with
subprocess.Popen()
Audio Analysisclass AnalysisMapper: init(): extract(analyzer.tar.bz2,”bin”) map(key,trackID): file = fetch_audio_file(trackID) proc = subprocess.Popen( [“bin/analyzer”,file], stdout = subprocess.PIPE) (out,err) = proc.communicate() yield trackID,out
Problem Scenario 2Map-reduce unfriendly computation:
• iterative algorithms on same data• huge mapper output ("map-increase")• curse of slowest reducer
Graph RecommendationsRandom walk on user-item graph
U
t 44 44
44
44
44
44
44 44
44 44
44
44
Many short routes from U to t ⇒ recommend!
Graph Recommendations
U
t 44 44
44
44
44
44
44 44
44 44
44
44
Graph Recommendationsrandom walk is equivalent to • Label Propagation (Baluja et al., 2008)• belongs to family of algorithms that are easy to code in map-reduce
User-track graph, edge weights = scrobbles:Label Propagation
U
V
W
X
a
b
c
d
e
f
24
411
23
5
33
4
44
44
44
44 44
44
User nodes are labelled with scrobbled tracks: Label Propagation
24
411
23
5
33
4
a
b
c
d
e
f
U
V
W
X
(a,0.2)(b,0.4)(c,0.4)
(b,0.5)(d,0.5)
(b,0.2)(d,0.3)(e,0.5)
(a,0.3)(d,0.3)(e,0.4)
44
44
44
44 44
44
Label Propagation
24
411
23
5
33
4
1 x (b,0.5),(d,0.5)3 x (b,0.2),(d,0.3),(e,0.5)Þ(b,0.37),d(0.47),(e,0.17)
next iteration e willpropagate to user V
a
b
c
d
e
f
U
V
W
X
Propagate, accumulate, normalise: (a,0.2)(b,0.4)(c,0.4)
(b,0.5)(d,0.5)
(b,0.2)(d,0.3)(e,0.5)
(a,0.3)(d,0.3)(e,0.4)
44
44
44
44 44
44
Label PropagationAfter some iterations:
• labels at item nodes = similar items• new labels at user nodes = recommendations
general approach assuming:• no global state• state at node recomputed from scratch
from incoming messages on each iteration
other examples:• breadth-first search• page rank
Map-Reduce GraphAlgorithms
Map-Reduce GraphAlgorithmsinputs:
• adjacency lists, state at each nodeoutput:
• updated state at each node
U
a
b
c
24
4 U,[(a,2),(b,4),(c,4)] 44
44
44
adjacency list for node U
Label Propagationclass PropagatingMapper: map(nodeID,value): # value holds label-weight pairs # and adjacency list for node labels,adj_list = value for node,weight in adj_list: # send a “stripe” of label-weight # pairs to each neighbouring node msg = [(label,prob*weight) for
label,prob in labels] yield node,msg
Label Propagationclass Reducer: reduce(nodeID,msgs): # accumulate labels = defaultdict(lambda:0) for msg in msgs: for label,w in msg: labels[label] += w # normalise, prune normalise(labels,MAX_LABELS_PER_NODE) yield nodeID,labels
Label PropagationNot map-reduce friendly:
• send graph over network on every iteration• huge mapper output:
• mappers soon send MAX_LABELS_PER_NODE updates along every edge
• some reducers receive huge input:• too slow if reducer streams the data,
OOM otherwise• NB can't partition real graphs to avoid this
• many natural graphs are scale-free e.g.AltaVista web graph top 1% of nodes adjacentto 53% of edges
Problem Scenario 2bMap-reduce unfriendly computation:
• shared memory
Examples:• almost all machine learning:
• split training examples between machines• all machines need to read/write many shared
parameter values
Hadoop without map-reduceGraph processing• Apache Giraph (Facebook)
Hadoop YARN• Knitting Boar, Iterative Reduce
http://www.cloudera.com/content/cloudera/en/resources/library/hadoopworld/strata-hadoop-world-2012-knitting-boar_slide_deck.htmlhttp://www.cloudera.com/content/cloudera/en/resources/library/hadoopworld/strata-hadoop-world-2012-knitting-boar_slide_deck.html
• ???
Alternatives in the cloudGraph Processing:• GraphLab (CMU)
Task-specific:• Yahoo! LDA
General:• HPCC• Spark (Berkeley)
Spark and SharkIn-memory cluster computingpros:
• fast!! (Shark is 100x faster than Hive)• code in Scala or Java or Python• can run on Hadoop YARN or Apache Mesos• ideal for iterative algorithms, nearline analytics• includes a Pregel clone & stream processing
cons:• hardware requirements???
GraphLabDistributed graph processingpros:
• vertex-centric programming model• handles true web-scale graphs• many toolkits already:
• collaborative filtering, topic modelling, graphical models,machine vision, graph analysis
cons:• new applications require non-trivial C++ coding
Word count in Sparkval file = spark.textFile(“hdfs://input”) val counts = file.flatMap(line => line.split(”“)) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile(“hdfs://output/wordcount”)
Logistic regression in Sparkval points = spark.textFile(…).map(parsePoint).cache() var w = Vector.random(D) // current separating plane for (i <- 1 to ITERATIONS) { val gradient = points.map(p => (1 / (1 + exp(-p.y*(w dot p.x))) – 1) * p.y * p.x ).reduce(_ + _) w -= gradient } println(“Final separating plane: “ + w)
[ points remain in memory for all iterations ]
Alternatives on your laptopGraph processing• GraphChi (CMU)
Machine learning• Sophia-ML (Google)• vowpal wabbit (Yahoo!, Microsoft)
GraphChiGraph processing on your laptoppros:
• still handles graphs with billions of edges• graph structure can be modified at runtime• Java/Scala ports under active development• some toolkits available:
• collaborative filtering, graph analysis
cons:• existing C++ toolkit code is hard to extend
vowpal wabbitclassification, regression, LDA, bandits, ...pros:
• handles huge ("terafeature") training datasets• very fast• state of the art algorithms• can run in distributed mode on Hadoop streaming
cons:• hard-core documentation
Take homesThink before you use Hadoop
• use your laptop for most problems• use a graph framework for graph data
Keep your Hadoop code simple • if you're just querying data use Hive• if not use a workflow framework
Check out the competition • Spark and HPCC look impressive