implementing large-scale matrix factorization on … apache spark 7 • general engine for...
TRANSCRIPT
Implementing Large-Scale Matrix Factorization on Apache Spark
Xiangrui MengAAAI 2017 Workshop on Distributed Machine Learning
About Me
• Software Engineer at Databricks • machine learning and data science/engineering
• Committer and PMC member of Apache Spark • MLlib, SparkR, PySpark, Spark Packages, etc
2
About Databricks
• Founded by the team who created Apache Spark • Offers a hosted service • Apache Spark in the cloud • Cluster management • Interactive notebooks • Enterprise features
• Free Community Edition • http://databricks.com/try
3
Outline
• Apache Spark / MLlib
• Alternating Least Squares (ALS)
• Implementation of ALS and lessons learned
4
Apache Spark
The most active open-source project in big data
About Apache Spark
7
• General engine for large-scale data processing under the MapReduce framework
• Resilient Distributed Dataset (RDD) with in-memory & on-disk caching
• Concise APIs in Python, Scala, Java, and R
• Apache open source license
Spark
Spark SQL Streaming MLlib GraphX SparkR
Spark Architecture
8
Driver
R/Py JVM
R/Py Backend
JVM
Worker
JVM
Worker
Data Sources
Spark is FastDaytona Gray Sort: Time to sort 100TB
2100 machines2013 Record: Hadoop 72 minutes
2014 Record: Spark
207 machines
23 minutes
Source: Daytona GraySort benchmark, sortbenchmark.org 9
2016 Cloud Sort Record: 100TB for $144.
Spark is Easy
“wordcount(wordcount) < 100”
sc.textFile("hdfs://...") \ .flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b)
10
Spark is Easier with DataFrames
Compute average age per department
• df.groupBy(“dept”).avg(“age”) # Python • df.groupBy(“dept”).avg(“age”) // Scala • df.groupBy(“dept”).avg(“age”); // Java • avg(groupBy(df, “dept”), “age”) # R • df %>% groupBy(“dept”) %>% avg(“age”) # R with magrittr
11
+----+--------+ |dept|avg(age)| +----+--------+ | eng| 25.0| | ops| 30.0| +----+--------+
Spark vs. MPI
• restricted but higher-level abstractions • Map/Reduce / Broadcast/Collect • RDD/DataFrame APIs
• built-in fault-tolerance • more data-centric than compute-centric
12
Spark MLlib
Large-scale machine learning on Apache Spark
About MLlib
• Started in UC Berkeley AMPLab • Shipped with Spark 0.8
• Currently (Spark 2.1) • Contributions from 100+ organizations, 300+ individuals • Good coverage of algorithms
14
MLlib’s Mission
MLlib’s mission is to make practical machine learning easy and scalable.
• Easy to build machine learning applications • Capable of learning from large-scale datasets
15
Algorithm Coverage• Classification • Logistic regression • Naive Bayes • Streaming logistic regression • Linear SVMs • Decision trees • Random forests • Gradient-boosted trees • Multilayer perceptron
• Regression • Ordinary least squares • Ridge regression • Lasso • Isotonic regression • Decision trees • Random forests • Gradient-boosted trees • Survival regression • Streaming linear methods
• Frequent pattern mining • FP-growth • PrefixSpan
16
Clustering • Gaussian mixture models • K-Means • Streaming K-Means • Latent Dirichlet Allocation • Power Iteration Clustering • Bisecting k-means
Statistics • Pearson correlation • Spearman correlation • Online summarization • Chi-squared test • Kernel density estimation • Kolmogorov–Smirnov test Linear algebra • Local dense & sparse vectors & matrices • Distributed matrices
• Block-partitioned matrix • Row matrix • Indexed row matrix • Coordinate matrix
• Matrix decompositions
Recommendation • Alternating Least Squares Feature extraction & selection • Word2Vec • Chi-Squared selection • Hashing term frequency • Inverse document frequency • Normalizer • Standard scaler • Tokenizer • One-Hot Encoder • StringIndexer • VectorIndexer • VectorAssembler • Binarizer • Bucketizer • ElementwiseProduct • PolynomialExpansion
Model import/export Pipelines
List based on Spark 1.6
Alternating Least Squares (ALS)
Collaborative filtering via matrix factorization
18
Collaborative Filteringitems
user
s
A: a rating matrix
Low-Rank Assumption
• What kind of movies do you like?
• sci-fi / crime / action
Perception of preferences usually takes place in a low dimensional latent space.
So the rating matrix is approximately low-rank.
19
A ⇡ UV T , U 2 Rm⇥k, V 2 Rn⇥k
aij ⇡ uTi vj
Objective Function
• minimize the reconstruction error
• but only consider observed ratings,
• or treat missing values as implicit negative feedback
20
minimize1
2kA� UV T k2F
minimize1
2
X
(i,j)2⌦
(aij � uTi vj)
2
Alternating Least Squares (ALS)
• If we fix U, the objective becomes convex and separable:
• Each sub-problem is a least squares problem, which can be solved in parallel. So we take alternating directions to minimize the objective:
• fix U, solve for V; • fix V, solve for U.
21
minimize1
2
X
j
0
@X
i,(i,j)2⌦
(aij � uTi vj)
2
1
A
Complexity
• To solve a least squares problem of size n-by-k, we need O(n k2) time. So the total computation cost is O(nnz k2), where nnz is the total number of ratings.
• We take the normal equation approach in ALS (w/ regularization)
• Solving each subproblem requires O(k2) storage. We call LAPACK’s routine to solve this problem.
22
A
TAx = A
Tb
References
• Hu, Koren, and Volinsky, Collaborative filtering for implicit feedback datasets. 2008. • Koren, Bell, and Volinsky, Matrix factorization techniques
for recommender systems. 2009.
23
ALS Implementation in MLlib
How to scale to 100,000,000,000 ratings?
Numerical Computation in JVM
• Spark is implemented in Scala, a JVM language. • JVM was not designed for high-performance computation, though
a careful implementation can achieve good performance.
• List<Double> is much slower than double[].
• Lack of scientific libraries. • Garbage collection. • …
• Accessing native BLAS/LAPACK via JNI.
25
Scala/Java
Java Native Interface (JNI)
BLAS/LAPAPCK
A Simple Parallel Implementation
• initialize user matrix U on the driver
• broadcast U to workers
• group ratings by items • solve each LS problems independently to compute item factors • collect items factors to the driver to assemble item matrix V
• broadcast V to workers • …
26
Communication Cost
Collecting/broadcasting user/item factors are too expensive. To make ALS scale to billions of ratings, millions of users/items, we have to distribute ratings (A), user factors (U), and item factors (V).
How to distribute factors to save communication/shuffle cost?
• all-to-all
• block-to-block
27
Communication: All-to-All
• users: u1, u2, u3; items: v1, v2, v3, v4 • shuffle size: O(nnz k) (nnz: number of nonzeros, i.e., ratings) • sending the same factor multiple times
28
Communication: Block-to-Block
• OutBlocks (P1, P2) • for each item block, which user factors to send
• InBlocks (Q1, Q2)
• for each item, which user factors to use29
Communication: Block-to-Block
• Shuffle size is significantly reduced.
30
Shuffle Even Less Data
• use Int instead of Long for user/item IDs • use Float instead of Double for ratings
31
Shuffle Even Less Data
• use Int instead of Long for user/item IDs • use Float instead of Double for ratings
• cache two copies of ratings — InBlocks for users and InBlocks for items.
32
1.5D Partitioning
33
DAG Visualization of an ALS Job
34
ratingBlocksitemOutBlocks
userInBlocks itemInBlocks
userOutBlocks
itemFactors 0
userFactors 1 itemFactors 1
preparation iterations
Compressed Storage for InBlocks
Array of rating tuples
• huge storage overhead • high garbage collection (GC) pressure
35
[(v1, u1, a11), (v2, u1, a12), (v1, u2, a21), (v2, u2, a22), (v2, u3, a32)]
Compressed Storage for InBlocks
Three primitive arrays
• low GC pressure • constructing all sub-problems together
• O(nj k2) storage
36
([v1, v2, v1, v2, v2], [u1, u1, u2, u2, u3], [a11, a12, a21, a22, a32])
Compressed Storage for InBlocks
Primitive arrays with items ordered:
• solving sub-problems in sequence: • O(k2) storage
• TimSort
37
([v1, v1, v2, v2, v2], [u1, u2, u1, u2, u3], [a11, a21, a12, a22, a32])
Compressed Storage for InBlocks
Compressed items:
• no duplicated items • map lookup for user factors
38
([v1, v2], [0, 2, 5], [u1, u2, u1, u2, u3], [a11, a21, a12, a22, a32])
Compressed Storage for InBlocks
Store block IDs and local indices instead of user IDs. For example, u3 is the first vector sent from P2.
Encode (block ID, local index) into an integer
• use higher bits for block ID
• use lower bits for local index
• works for ~4 billions of unique users/items
01 | 00 0000 0000 000039
([v1, v2], [0, 2, 5], [0|0, 0|1, 0|0, 0|1, 1|0], [a11, a21, a12, a22, a32])
Avoid Garbage Collection
We use specialized code to replace the following in order to avoid generating millions of tiny temporary objects: • initial partitioning of ratings ratings.map { r => ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) }.aggregateByKey(new RatingBlockBuilder)( seqOp = (b, r) => b.add(r), combOp = (b0, b1) => b0.merge(b1.build())) .mapValues(_.build())
• map IDs to local indices dstIds.toSet.toSeq.sorted.zipWithIndex.toMap
40
Amazon Reviews Dataset
• Amazon Reviews: ~6.6 million users, ~2.2 million items, and ~30 million ratings
• Tested ALS on stacked copies on a 16-node m3.2xlarge cluster with rank=10, iter=10:
41
Storage Comparison
42
1.2 1.3/1.4
userInBlock 941MB 277MB
userOutBlock 355MB 65MB
itemInBlock 1380MB 243MB
itemOutBlock 119MB 37MB
Spotify Dataset
• Spotify: 75+ million users and 30+ million songs • Tested ALS on a subset with ~50 million users, ~5 million
songs, and ~50 billion ratings. • thanks to Chris Johnson and Anders Arpteg
• 32 r3.8xlarge nodes (~$10/hr with spot instances) • It took 1 hour to finish 10 iterations with rank 10. • 10 mins to prepare in/out blocks • 5 mins per iteration
43
ALS Implementation in MLlib
• 1.5D partitioning of ratings
• Duplicate and cache ratings to save communication
• Efficient CSC-like storage format • Using primitive arrays to avoid JVM GC issues • Native LAPACK calls
44
Future Directions
• Leverage on code generation to save specialized code that avoids GC. • Solve issues with really popular items. • Use QR instead of Cholesky. • Explore other recommendation algorithms, e.g.,
factorization machine.
45
Related Work
• Zadeh et al, Matrix Computations and Optimization in Apache Spark, KDD’16. • Abuzaid et al, Yggdrasil: An Optimized System for
Training Deep Decision Trees at Scale, NIPS’16.
46
Thank you.• Spark: http://spark.apache.org • Databricks Community Edition: http://databricks.com/try • Databricks: http://databricks.com