implementing large-scale matrix factorization on … apache spark 7 • general engine for...

47
Implementing Large-Scale Matrix Factorization on Apache Spark Xiangrui Meng AAAI 2017 Workshop on Distributed Machine Learning

Upload: dinhkiet

Post on 19-Mar-2018

220 views

Category:

Documents


4 download

TRANSCRIPT

Page 1: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Implementing Large-Scale Matrix Factorization on Apache Spark

Xiangrui MengAAAI 2017 Workshop on Distributed Machine Learning

Page 2: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

About Me

• Software Engineer at Databricks • machine learning and data science/engineering

• Committer and PMC member of Apache Spark • MLlib, SparkR, PySpark, Spark Packages, etc

2

Page 3: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

About Databricks

• Founded by the team who created Apache Spark • Offers a hosted service • Apache Spark in the cloud • Cluster management • Interactive notebooks • Enterprise features

• Free Community Edition • http://databricks.com/try

3

Page 4: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Outline

• Apache Spark / MLlib

• Alternating Least Squares (ALS)

• Implementation of ALS and lessons learned

4

Page 5: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Apache Spark

The most active open-source project in big data

Page 6: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset
Page 7: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

About Apache Spark

7

• General engine for large-scale data processing under the MapReduce framework

• Resilient Distributed Dataset (RDD) with in-memory & on-disk caching

• Concise APIs in Python, Scala, Java, and R

• Apache open source license

Spark

Spark SQL Streaming MLlib GraphX SparkR

Page 8: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spark Architecture

8

Driver

R/Py JVM

R/Py Backend

JVM

Worker

JVM

Worker

Data Sources

Page 9: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spark is FastDaytona Gray Sort: Time to sort 100TB

2100 machines2013 Record: Hadoop 72 minutes

2014 Record: Spark

207 machines

23 minutes

Source: Daytona GraySort benchmark, sortbenchmark.org 9

2016 Cloud Sort Record: 100TB for $144.

Page 10: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spark is Easy

“wordcount(wordcount) < 100”

sc.textFile("hdfs://...") \ .flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b)

10

Page 11: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spark is Easier with DataFrames

Compute average age per department

• df.groupBy(“dept”).avg(“age”) # Python • df.groupBy(“dept”).avg(“age”) // Scala • df.groupBy(“dept”).avg(“age”); // Java • avg(groupBy(df, “dept”), “age”) # R • df %>% groupBy(“dept”) %>% avg(“age”) # R with magrittr

11

+----+--------+ |dept|avg(age)| +----+--------+ | eng| 25.0| | ops| 30.0| +----+--------+

Page 12: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spark vs. MPI

• restricted but higher-level abstractions • Map/Reduce / Broadcast/Collect • RDD/DataFrame APIs

• built-in fault-tolerance • more data-centric than compute-centric

12

Page 13: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spark MLlib

Large-scale machine learning on Apache Spark

Page 14: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

About MLlib

• Started in UC Berkeley AMPLab • Shipped with Spark 0.8

• Currently (Spark 2.1) • Contributions from 100+ organizations, 300+ individuals • Good coverage of algorithms

14

Page 15: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

MLlib’s Mission

MLlib’s mission is to make practical machine learning easy and scalable.

• Easy to build machine learning applications • Capable of learning from large-scale datasets

15

Page 16: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Algorithm Coverage• Classification • Logistic regression • Naive Bayes • Streaming logistic regression • Linear SVMs • Decision trees • Random forests • Gradient-boosted trees • Multilayer perceptron

• Regression • Ordinary least squares • Ridge regression • Lasso • Isotonic regression • Decision trees • Random forests • Gradient-boosted trees • Survival regression • Streaming linear methods

• Frequent pattern mining • FP-growth • PrefixSpan

16

Clustering • Gaussian mixture models • K-Means • Streaming K-Means • Latent Dirichlet Allocation • Power Iteration Clustering • Bisecting k-means

Statistics • Pearson correlation • Spearman correlation • Online summarization • Chi-squared test • Kernel density estimation • Kolmogorov–Smirnov test Linear algebra • Local dense & sparse vectors & matrices • Distributed matrices

• Block-partitioned matrix • Row matrix • Indexed row matrix • Coordinate matrix

• Matrix decompositions

Recommendation • Alternating Least Squares Feature extraction & selection • Word2Vec • Chi-Squared selection • Hashing term frequency • Inverse document frequency • Normalizer • Standard scaler • Tokenizer • One-Hot Encoder • StringIndexer • VectorIndexer • VectorAssembler • Binarizer • Bucketizer • ElementwiseProduct • PolynomialExpansion

Model import/export Pipelines

List based on Spark 1.6

Page 17: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Alternating Least Squares (ALS)

Collaborative filtering via matrix factorization

Page 18: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

18

Collaborative Filteringitems

user

s

A: a rating matrix

Page 19: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Low-Rank Assumption

• What kind of movies do you like?

• sci-fi / crime / action

Perception of preferences usually takes place in a low dimensional latent space.

So the rating matrix is approximately low-rank.

19

A ⇡ UV T , U 2 Rm⇥k, V 2 Rn⇥k

aij ⇡ uTi vj

Page 20: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Objective Function

• minimize the reconstruction error

• but only consider observed ratings,

• or treat missing values as implicit negative feedback

20

minimize1

2kA� UV T k2F

minimize1

2

X

(i,j)2⌦

(aij � uTi vj)

2

Page 21: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Alternating Least Squares (ALS)

• If we fix U, the objective becomes convex and separable:

• Each sub-problem is a least squares problem, which can be solved in parallel. So we take alternating directions to minimize the objective:

• fix U, solve for V; • fix V, solve for U.

21

minimize1

2

X

j

0

@X

i,(i,j)2⌦

(aij � uTi vj)

2

1

A

Page 22: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Complexity

• To solve a least squares problem of size n-by-k, we need O(n k2) time. So the total computation cost is O(nnz k2), where nnz is the total number of ratings.

• We take the normal equation approach in ALS (w/ regularization)

• Solving each subproblem requires O(k2) storage. We call LAPACK’s routine to solve this problem.

22

A

TAx = A

Tb

Page 23: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

References

• Hu, Koren, and Volinsky, Collaborative filtering for implicit feedback datasets. 2008. • Koren, Bell, and Volinsky, Matrix factorization techniques

for recommender systems. 2009.

23

Page 24: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

ALS Implementation in MLlib

How to scale to 100,000,000,000 ratings?

Page 25: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Numerical Computation in JVM

• Spark is implemented in Scala, a JVM language. • JVM was not designed for high-performance computation, though

a careful implementation can achieve good performance.

• List<Double> is much slower than double[].

• Lack of scientific libraries. • Garbage collection. • …

• Accessing native BLAS/LAPACK via JNI.

25

Scala/Java

Java Native Interface (JNI)

BLAS/LAPAPCK

Page 26: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

A Simple Parallel Implementation

• initialize user matrix U on the driver

• broadcast U to workers

• group ratings by items • solve each LS problems independently to compute item factors • collect items factors to the driver to assemble item matrix V

• broadcast V to workers • …

26

Page 27: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Communication Cost

Collecting/broadcasting user/item factors are too expensive. To make ALS scale to billions of ratings, millions of users/items, we have to distribute ratings (A), user factors (U), and item factors (V).

How to distribute factors to save communication/shuffle cost?

• all-to-all

• block-to-block

27

Page 28: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Communication: All-to-All

• users: u1, u2, u3; items: v1, v2, v3, v4 • shuffle size: O(nnz k) (nnz: number of nonzeros, i.e., ratings) • sending the same factor multiple times

28

Page 29: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Communication: Block-to-Block

• OutBlocks (P1, P2) • for each item block, which user factors to send

• InBlocks (Q1, Q2)

• for each item, which user factors to use29

Page 30: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Communication: Block-to-Block

• Shuffle size is significantly reduced.

30

Page 31: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Shuffle Even Less Data

• use Int instead of Long for user/item IDs • use Float instead of Double for ratings

31

Page 32: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Shuffle Even Less Data

• use Int instead of Long for user/item IDs • use Float instead of Double for ratings

• cache two copies of ratings — InBlocks for users and InBlocks for items.

32

Page 33: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

1.5D Partitioning

33

Page 34: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

DAG Visualization of an ALS Job

34

ratingBlocksitemOutBlocks

userInBlocks itemInBlocks

userOutBlocks

itemFactors 0

userFactors 1 itemFactors 1

preparation iterations

Page 35: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Compressed Storage for InBlocks

Array of rating tuples

• huge storage overhead • high garbage collection (GC) pressure

35

[(v1, u1, a11), (v2, u1, a12), (v1, u2, a21), (v2, u2, a22), (v2, u3, a32)]

Page 36: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Compressed Storage for InBlocks

Three primitive arrays

• low GC pressure • constructing all sub-problems together

• O(nj k2) storage

36

([v1, v2, v1, v2, v2], [u1, u1, u2, u2, u3], [a11, a12, a21, a22, a32])

Page 37: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Compressed Storage for InBlocks

Primitive arrays with items ordered:

• solving sub-problems in sequence: • O(k2) storage

• TimSort

37

([v1, v1, v2, v2, v2], [u1, u2, u1, u2, u3], [a11, a21, a12, a22, a32])

Page 38: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Compressed Storage for InBlocks

Compressed items:

• no duplicated items • map lookup for user factors

38

([v1, v2], [0, 2, 5], [u1, u2, u1, u2, u3], [a11, a21, a12, a22, a32])

Page 39: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Compressed Storage for InBlocks

Store block IDs and local indices instead of user IDs. For example, u3 is the first vector sent from P2.

Encode (block ID, local index) into an integer

• use higher bits for block ID

• use lower bits for local index

• works for ~4 billions of unique users/items

01 | 00 0000 0000 000039

([v1, v2], [0, 2, 5], [0|0, 0|1, 0|0, 0|1, 1|0], [a11, a21, a12, a22, a32])

Page 40: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Avoid Garbage Collection

We use specialized code to replace the following in order to avoid generating millions of tiny temporary objects: • initial partitioning of ratings ratings.map { r => ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) }.aggregateByKey(new RatingBlockBuilder)( seqOp = (b, r) => b.add(r), combOp = (b0, b1) => b0.merge(b1.build())) .mapValues(_.build())

• map IDs to local indices dstIds.toSet.toSeq.sorted.zipWithIndex.toMap

40

Page 41: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Amazon Reviews Dataset

• Amazon Reviews: ~6.6 million users, ~2.2 million items, and ~30 million ratings

• Tested ALS on stacked copies on a 16-node m3.2xlarge cluster with rank=10, iter=10:

41

Page 42: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Storage Comparison

42

1.2 1.3/1.4

userInBlock 941MB 277MB

userOutBlock 355MB 65MB

itemInBlock 1380MB 243MB

itemOutBlock 119MB 37MB

Page 43: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Spotify Dataset

• Spotify: 75+ million users and 30+ million songs • Tested ALS on a subset with ~50 million users, ~5 million

songs, and ~50 billion ratings. • thanks to Chris Johnson and Anders Arpteg

• 32 r3.8xlarge nodes (~$10/hr with spot instances) • It took 1 hour to finish 10 iterations with rank 10. • 10 mins to prepare in/out blocks • 5 mins per iteration

43

Page 44: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

ALS Implementation in MLlib

• 1.5D partitioning of ratings

• Duplicate and cache ratings to save communication

• Efficient CSC-like storage format • Using primitive arrays to avoid JVM GC issues • Native LAPACK calls

44

Page 45: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Future Directions

• Leverage on code generation to save specialized code that avoids GC. • Solve issues with really popular items. • Use QR instead of Cholesky. • Explore other recommendation algorithms, e.g.,

factorization machine.

45

Page 46: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Related Work

• Zadeh et al, Matrix Computations and Optimization in Apache Spark, KDD’16. • Abuzaid et al, Yggdrasil: An Optimized System for

Training Deep Decision Trees at Scale, NIPS’16.

46

Page 47: Implementing Large-Scale Matrix Factorization on … Apache Spark 7 • General engine for large-scale data processing under the MapReduce framework • Resilient Distributed Dataset

Thank you.• Spark: http://spark.apache.org • Databricks Community Edition: http://databricks.com/try • Databricks: http://databricks.com