real-time recommendations on spark - meetupfiles.meetup.com/14077672/150519_sparkmeetup... · •...
TRANSCRIPT
DC Spark Interactive Meetup East
May 19 2015
Real-time Recommendations on Spark
Jan Neumann, Sridhar Alla (Comcast Labs)
Who am I?
• Jan Neumann, Lead of Big Data and Content Analysis Research Teams
• This is joint work with Sridhar Alla, Director/Big Data Architect, Enterprise BI
• Comcast Labs DC Responsibilities
• Develop Content Discovery and Metadata Back-end Services for Comcast
• Innovation
• Voice Interface for TV
• Machine Learning/Data Science Expertise for all of Comcast
Algorithmic Menus
Poster Art
Search
Video On-Demand
Live TV
Comcast Labs DC powers all CONTENT DISCOVERY for X1
Personalized
Recommendations
Who are we similar to?
METADATA LIKE SEARCH LIKE RECOMMENDATIONS LIKE
Powering millions of devices Taking into account your TV
channels, subscriptions and
tastes
Including live programming
CONTENTINFORMATION
CONTENT
IMAGES
LOGOS
SUBSCRIBERINFORMATION
CATALOGS
ENTITLEMENTS
CHANNEL LINEUPS
DISCOVERY
MENU
RECOMMEND
BROWSE
PERSONALIZE
VOICE CONTROL
SEARCH
MILLIONS OF DEVICES
METADATA
PROVIDERS
CONTENT
PROVIDERS
BILLING
SYSTEMS
CUSTOMER
USAGE
PURCHASES
How it all worksFormed in 2011, we now operate one of the largest and most sophisticated metadata and content discovery platforms in the industry.
DEEP METADATA
What are recommendation systems?
• Recommendation systems (RS) are everywhere
• Video (Comcast, Netflix)
• Music (Apple Genius, Spotify, Pandora)
• Products (Amazon, Ebay)
• Targeted Advertisements
• …
Items
Search Recommendations
What are recommendation systems?
• Recommendation systems (RS) are everywhere
• Video (Comcast, Netflix)
• Music (Apple Genius, Spotify, Pandora)
• Products (Amazon, Ebay)
• Targeted Advertisements
• …
• RS help to match users with items
• Ease information overload - long-tail
• Sales assistance (guidance, advisory,
persuasion,…)
Items
Search Recommendations
What are recommendation systems?
• Recommendation systems (RS) are everywhere
• Video (Comcast, Netflix)
• Music (Apple Genius, Spotify, Pandora)
• Products (Amazon, Ebay)
• Targeted Advertisements
• …
• RS help to match users with items
• Ease information overload - long-tail
• Sales assistance (guidance, advisory,
persuasion,…)
• Different system designs / paradigms
• Based on availability of exploitable data
• Implicit and explicit user feedback
• Domain characteristics
Items
Search Recommendations
Importance of recommending from the long tail
Content-based Recommendations
• Main idea: Recommend items to customer x similar to previous items rated highly
by x
Example:
• Movie recommendations
• Recommend movies with same actor(s),
director, genre, …
• Websites, blogs, news
• Recommend other sites with “similar” content
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Pros: Content-based Approach
• +: No need for data on other users
• No cold-start or sparsity problems
• +: Able to recommend to users with
unique tastes
• +: Able to recommend new & unpopular items
• No first-rater problem
• +: Able to provide explanations
• Can provide explanations of recommended items by listing content-
features that caused an item to be recommended
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Cons: Content-based Approach
• –: Finding the appropriate features is hard
• E.g., images, movies, music
• –: Recommendations for new users
• How to build a user profile?
• –: Overspecialization
• Never recommends items outside user’s
content profile
• People might have multiple interests
• Unable to exploit quality judgments of other users
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Collaborative Filtering: Example
• Customer Y
• Does search on Metallica
• RS suggests Megadeth from data
collected about customer X
13
• Customer X
• Buys Metallica CD
• Buys Megadeth CD
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Collaborative Filtering
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Pros/Cons of Collaborative Filtering
• + Works for any kind of item
• No feature selection needed
• - Cold Start:
• Need enough users in the system to find a match
• - Sparsity:
• The user/ratings matrix is sparse
• Hard to find users that have rated the same items
• - First rater:
• Cannot recommend an item that has not been previously rated
• New items, Esoteric items
• - Popularity bias:
• Cannot recommend items to someone with unique taste
• Tends to recommend popular items
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
This Talk: Real-time TV Recommendations
= “Trending For You”
+
How can we do it?
• Challenges
• We have millions of users, thousands of programs
• Programs on live TV are constantly changing (Cold-Start)
• Approach inspired by “Google news personalization: scalable online
collaborative filtering”, Das et al., 2007
• Recommend what people in your geographic area with a taste
similar to you are currently watching …
• and do it in Spark.
Real-time Recommendations Algorithm
• Cluster user by taste profiles and geographic proximity
• Calculate Top K trending programs for each cluster
• Look up cluster for user and return trending programs
User->Cluster
Cluster -> Trending Programs
User -> Trending Programs
User Request
Real-time Recommendations in Spark
• Thanks to Spark we can implement this quickly
Live Tune
Activity via
Kafka
Batch:
User Clustering with
MlLib
Real-time:
TopK Trending Programs
per Cluster w/ Spark
Streaming
Real-time Program
recommendations per
user
User History
from HDFS
Collaborative Filtering Implementation: Matrix Factorization
Collaborative Filtering Implementation: Matrix Factorization
https://github.com/amplab/datascience-sp14/blob/master/hw3/hw3-part1.ipynb
No Ratings: Implicit Matrix Factorization
• ALS.trainImplicit(view_count,k,numIter,alpha,lambda)
• For more info see Music Recommendations with Spark, Chris Johnson
(Spotify), Spark Summit 2014
24 0 1430 71 091 69 87330 0 00 0 2
1 0 10 1 01 1 11 0 00 0 1
−
𝑈1𝑈2𝑈3𝑈4𝑈5
𝑀1 𝑀2 𝑀3
1.3 1 4.31 2.1 13.1 2 37 1 11 1 1
∗
View Count in% watched
Confidence= f(view count)
PreferenceMatrix
User Matrix
#users*k
MovieMatrix
k*#movies
min
Math Detour : Cluster Normalized User Vectors
• Spark KMeans can only cluster points in Euclidean space
• Cannot cluster preference vectors directly:
• 𝑃1 − 𝑃22 = 𝑈1𝑀 − 𝑈2𝑀
2 = (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 ≠ 𝑈1 − 𝑈2
2
D
𝑀𝑇
X
𝑋𝑇
𝑌𝑇
Batch: Cluster Users based on their Tastes
• Group users by geographic area
• Compute user taste vector from viewing history for each geographic area
• Cluster users to find groups with similar tastes
Usage history
aggregation
SVD for Dimensionality
Reduction
KmeansClustering of Users
User->Cluster
Implicit Matrix Factorization
Batch Implementation
# for each geographic region
# convert user viewing history to ratings (hash user_id to int)
val user_history = sc.textFile(“user_history.dat”)
val ratings = user_history.flat_map(parse_ratings)
Batch Implementation
# for each geographic region
# convert user viewing history to ratings (hash user_id to int)
val user_history = sc.textFile(“user_history.dat”)
val ratings = user_history.flat_map(parse_ratings)
# build matrix factorization model
val mf_model = ALS.train_implicit(ratings, rank, n, lambda, alpha)
Batch Implementation
# transform the movie feature matrix
val productRows = mf_model.productFeatures.map(s=>Vectors.dense(s._2))
val productRowMatrix = new RowMatrix(productRows)
val productSVD = productRowMatrix.computeSVD(svdRank)
val userFeatures = userRowMatrix.multiply(productSVD.V)
.multiply(Matrices.diag(productSVD.s))
Batch Implementation
# transform the movie feature matrix
val productRows = mf_model.productFeatures.map(s=>Vectors.dense(s._2))
val productRowMatrix = new RowMatrix(productRows)
val productSVD = productRowMatrix.computeSVD(svdRank)
val userFeatures = userRowMatrix.multiply(productSVD.V)
.multiply(Matrices.diag(productSVD.s))
# use latent taste space to cluster users
val cluster_model = KMeans(userFeatures.rows,numClusters,numIter)
Real-time Data Ingest
• Back-end servers log user interactions
with STB
• Data is processed and formatted using
Flume
• Data is then passed on to
• Real-time Process using Storm
• Long-storage in HDFS
• For external consumption via
access-contolled Kafka/REST
• We connect to Kafka (real-time) or
access data directly from HDFS (batch)
How data flows into the system
Real-time: Compute TopK TV programs per cluster
• What is each viewer watching?
• Aggregate popular programs across users for each cluster
• Keep Top K (using Twitter Algebird TopK Monoid)
Map Program
Counts to User
Clusters
Create TopKProgram List for
each Cluster
Cluster -> Trending Programs
Count Programs
being viewed
Update Viewer State
Real-Time Implementation
// format event_time|device_id|program_id|station_id|dma_title|tune_type
// get data from Kafka
val tuneEventsPerUser = KafkaUtils.createStream(ssc, zkQuorum, groupId,
topics, storageLevel).flatMap(parseTuneEventByUser)
Real-Time Implementation
// format event_time|device_id|program_id|station_id|dma_title|tune_type
// get data from Kafka
val tuneEventsPerUser = KafkaUtils.createStream(ssc, zkQuorum, groupId,
topics, storageLevel).flatMap(parseTuneEventByUser)
// what is being watched by each user
val userState =
tuneEventsPerUser.updateStateByKey(updateUserHistory).cache()
Real-Time Implementation
// format event_time|device_id|program_id|station_id|dma_title|tune_type
// get data from Kafka
val tuneEventsPerUser = KafkaUtils.createStream(ssc, zkQuorum, groupId,
topics, storageLevel).flatMap(parseTuneEventByUser)
// what is being watched by each user
val userState =
tuneEventsPerUser.updateStateByKey(updateUserHistory).cache()
// aggregate tunes per program per cluster
val tvTunes = userState.map { case (userId,tuneInfo) =>
((tuneInfo.programId,user2cluster(userId)),1) }.reduceByKey(_+_)
Compute Top K Programs per Cluster
import com.twitter.algebird.TopKMonoid
case class ProgramCount (val programId:Long, val count: Int)
extends Ordered[ProgramCount] {
def compare(that: ProgramCount):Int = { ... }
}
val topKMonoid = new TopKMonoid[ProgramCount](topk)
Compute Top K Programs per Cluster
import com.twitter.algebird.TopKMonoid
case class ProgramCount (val programId:Long, val count: Int) extends
Ordered[ProgramCount] {
def compare(that: ProgramCount):Int = { ... }
}
val topKMonoid = new TopKMonoid[ProgramCount](topk)
val tvTopK = tvTunes.map { case ((programId,clusterId),cnt) =>
(clusterId,topKMonoid.build(ProgramCount(programId,cnt)))
}.reduceByKey(topKMonoid.plus)
Compute Top K Programs per Cluster
import com.twitter.algebird.TopKMonoid
case class ProgramCount (val programId:Long, val count: Int) extends
Ordered[ProgramCount] {
def compare(that: ProgramCount):Int = { ... }
}
val topKMonoid = new TopKMonoid[ProgramCount](topk)
val tvTopK = tvTunes.map { case ((programId,clusterId),cnt) =>
(clusterId,topKMonoid.build(ProgramCount(programId,cnt)))
}.reduceByKey(topKMonoid.plus)
// export top tunes to Redis for lookup by web service …
tvTopK.forEachRDD(rdd =>
rdd.forEachPartition(p => (saveToRedis(p)))
Results
• Leverage existing Hadoop infrastructure and data
• Compute 10 user clusters for 100k users in less than 10 minutes using 100 cores , 128GB
RAM
• Consume STB events on a real time basis directly from Kafka
• Calculate Top K trending programs for each cluster in 20 sec micro batches storing the results
in Redis.
• Service requests for Personalized Trending Shows = Happy Customer
“Trending for You” Web Service – Morning
“Trending for You” Web Service – Evening
Final Words
• Thanks to Spark we implemented first version in 2-3 weeks
• Example accelerated adoption of Spark in dev & research
• Many further improvements possible
• Do time-dependent clustering of user tastes
• Gather feedback from real users
• We are hiring! Contact us at jobs.comcast.com
Math Detour : Cluster Normalized User Vectors
• Spark Kmeans can only cluster elements in Euclidean space
• Problem:
• 𝑃1 − 𝑃22 = 𝑈1𝑀 − 𝑈2𝑀
2 = (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 ≠ 𝑈1 − 𝑈2
2
• M.computeSVD 𝑀𝑇 = 𝑋 𝐷 𝑌𝑇 where 𝑋𝑇𝑋 = 𝐼, 𝑌𝑇Y = 𝐼, D = diag
Math Detour : Cluster Normalized User Vectors
• Spark Kmeans can only cluster elements in Euclidean space
• Problem:
• 𝑃1 − 𝑃22 = 𝑈1𝑀 − 𝑈2𝑀
2 = (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 ≠ 𝑈1 − 𝑈2
2
• M.computeSVD 𝑀𝑇 = 𝑋 𝐷 𝑌𝑇 where 𝑋𝑇𝑋 = 𝐼, 𝑌𝑇Y = 𝐼, D = diag
• (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 = (𝑈1 − 𝑈2)𝑌𝐷𝐷𝑌
𝑇(𝑈1 − 𝑈2)𝑇 = 𝑈1 − 𝑈2
2
• With 𝑈 = 𝑈 𝑌 𝐷
Math Detour : Cluster Normalized User Vectors
• Spark Kmeans can only cluster elements in Euclidean space
• Problem:
• 𝑃1 − 𝑃22 = 𝑈1𝑀 − 𝑈2𝑀
2 = (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 ≠ 𝑈1 − 𝑈2
2
• M.computeSVD 𝑀𝑇 = 𝑋 𝐷 𝑌𝑇 where 𝑋𝑇𝑋 = 𝐼, 𝑌𝑇Y = 𝐼, D = diag
• (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 = (𝑈1 − 𝑈2)𝑌𝐷𝐷𝑌
𝑇(𝑈1 − 𝑈2)𝑇 = 𝑈1 − 𝑈2
2
• With 𝑈 = 𝑈 𝑌 𝐷
• KMeans.train( 𝑈,numClusters,numIter)
Math Detour : Cluster Normalized User Vectors
• Spark Kmeans can only cluster elements in Euclidean space
• 𝑃1 − 𝑃22 = 𝑈1𝑀 − 𝑈2𝑀
2 = (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 ≠ 𝑈1 − 𝑈2
2
• M.computeSVD 𝑀𝑇 = 𝑋 𝐷 𝑌𝑇 where 𝑋𝑇𝑋 = 𝐼, 𝑌𝑇Y = 𝐼, D = diag
• (𝑈1 − 𝑈2)𝑀𝑀𝑇(𝑈1 − 𝑈2)𝑇 = (𝑈1 − 𝑈2)𝑌𝐷𝐷𝑌
𝑇(𝑈1 − 𝑈2)𝑇 = 𝑈1 − 𝑈2
2
• With 𝑈 = 𝑈 𝑌 𝐷
• KMeans.train( 𝑈,numClusters,numIter)
Geared towards females
Geared towards males
Serious
Funny
Motivation behind Matrix Factorizations (Latent Space Models)
The Princess
Diaries
The Lion King
Braveheart
Lethal Weapon
Independence
Day
AmadeusThe Color
Purple
Dumb and
Dumber
Ocean’s 11
Sense and
Sensibility
From J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive Datasets, http://www.mmds.org
Geared towards females
Geared towards males
serious
funny
The Effect of Regularization
47J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive
The Princess
Diaries
The Lion King
Braveheart
Lethal Weapon
Independence
Day
AmadeusThe Color
Purple
Dumb and
Dumber
Ocean’s 11
Sense and
Sensibility
Factor 1
Fa
cto
r 2
minfactors “error” + “length”
i
i
x
x
training
xixiQP
qppqr222
,
)(min
Geared towards females
Geared towards males
serious
funny
The Effect of Regularization
48J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive
The Lion King
Braveheart
Lethal Weapon
Independence
Day
AmadeusThe Color
Purple
Dumb and
Dumber
Ocean’s 11
Sense and
Sensibility
Factor 1
Fa
cto
r 2
The Princess
Diaries
minfactors “error” + “length”
i
i
x
x
training
xixiQP
qppqr222
,
)(min
Geared towards females
Geared towards males
serious
funny
The Effect of Regularization
49J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive
The Lion King
Braveheart
Lethal Weapon
Independence
Day
AmadeusThe Color
Purple
Dumb and
Dumber
Ocean’s 11
Sense and
Sensibility
Factor 1
Fa
cto
r 2
minfactors “error” + “length”
The Princess
Diaries
i
i
x
x
training
xixiQP
qppqr222
,
)(min
Geared towards females
Geared towards males
serious
funny
The Effect of Regularization
50J. Leskovec, A. Rajaraman, J. Ullman: Mining of Massive
The Lion King
Braveheart
Lethal Weapon
Independence
Day
AmadeusThe Color
Purple
Dumb and
Dumber
Ocean’s 11
Sense and
Sensibility
Factor 1
Fa
cto
r 2
The Princess
Diaries
minfactors “error” + “length”
i
i
x
x
training
xixiQP
qppqr222
,
)(min