Page 1: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark
Page 2: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

User of Spark since 2012

Organiser of the London Spark Meetup

Run Data Science team at Skimlinks

Who am I

Page 3: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

Apache Spark

Page 4: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark



Page 5: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


>>> thisrdd = sc.parallelize(range(12), 4)

>>> thisrdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

>>> otherrdd = x:x%3)

>>> otherrdd.collect()

[0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2]

Page 6: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


Page 7: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark



[(0, 0), (1, 1), (2, 2), (0, 3), (1, 4), (2, 5), (0, 6), (1, 7), (2, 8), (0, 9), (1, 10), (2, 11)]

>>> x,y: x+y).collect()

[(0, 18), (1, 22), (2, 26)]

Page 8: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark



Page 9: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

Set the number of reducers sensibly

Configure your pyspark cluster properly

Don’t shuffle (unless you have to)

Don’t groupBy

Repartition your data if necessary


How to not crash your spark job

Page 10: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

Lots of people will say 'use scala'


Page 11: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

Lots of people will say 'use scala'

Don't listen to those people.


Page 12: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


Naive bayes - recap

Page 13: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

# get (class label, word) tupleslabel_token = gettokens(docs)

# [(False, u'https'), (True, u'fashionblog'), (True, u'dress'), (False, u'com')),...]

tokencounter = (label, token): (token, (label, not label)))

# [(u'https', [0, 1]), (u'fashionblog', [1, 0]), (u'dress', [1, 0]), (u'com', [0, 1])), ...]

# get the word count for each classtermcounts = tokencounter.reduceByKey(lambda x, y: map(add, x, y))

# [(u'https', [100, 112]), (u'fashionblog', [0, 100]), (u'dress', [5, 15]), (u'com', [95, 100])),



Naive Bayes in Spark

Page 14: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

termcounts_plus_pseudo = (term, counts): (term, map(add,

counts, (1, 1))))

# [(u'https', [100, 112]), (u'fashionblog', [0, 100]), (u'dress', [5, 15]),...]

# => [(u'https', [101, 113]), (u'fashionblog', [1, 101]), (u'dress', [6, 16]),...]

# get the total number of words in each class

values = (term, (truecounts, falsecounts)):

(truecounts, falsecounts))

totals = values.reduce(lambda x,y: map(add, x,y))

# [1321, 2345]

P_t = (label, counts): (label, map(truediv,

counts, totals)))


Naive Bayes in Spark

Page 15: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


{k1: 2, …} (k1, 2)

(k1, 3)



{k1: 10, …}


combineLocally _mergeCombiners

{k1: 3, …}

{k1: 5, …}

(k1, 1)(k1, 1)

(k1, 2)(k1, 1)

(k1, 5)

Page 16: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


{k1: 2, …} (k1, 2)

(k1, 3)



{k1: 10, …}


combineLocally _mergeCombiners

{k1: 3, …}

{k1: 5, …} reduceByKey(numPartitions)

(k1, 1)(k1, 1)

(k1, 2)(k1, 1)

(k1, 5)

Page 17: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

RDD.aggregate(zeroValue, seqOp, combOp)Aggregate the elements of each partition, and then the results for all the partitions, using a given

combine functions and a neutral “zero value.”


Naive Bayes in Spark

Page 18: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

class WordFrequencyAgreggator(object):

def __init__(self):

self.S = {}

def add(self, (token, count)):

if token not in self.S:

self.S[token] = (0,0)

self.S[token] = map(add, self.S[token], count)

return self

def merge(self, other):

for term, count in other.S.iteritems():

if term not in self.S:

self.S[term] = (0,0)

self.S[term] = map(add, self.S[term], count)

return self


Naive Bayes in Spark: Aggregation

Page 19: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

With aggregatetermcounts = tokencounter.reduceByKey(lambda x, y: map(add, x, y))

# [(u'https', [0, 1]), (u'fashionblog', [0, 1]), (u'dress', [0, 1]),...]

# => [(u'https', [100, 112]), (u'fashionblog', [0, 100]), (u'dress', [5, 15]),...]

With aggregateaggregates = tokencounter.aggregate(WordFrequencyAgreggator(), lambda x,y:x.add(y),

lambda x,y: x.merge(y))

RDD.aggregate(zeroValue, seqOp, combOp)


Naive Bayes in Spark

Page 20: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


Naive Bayes in Spark: Aggregation

Page 21: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


Naive Bayes in Spark: treeAggregation

Page 22: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

RDD.treeAggregate(zeroValue, seqOp, combOp, depth=2)

Aggregates the elements of this RDD in a multi-level tree pattern.

With reducetermcounts = tokencounter.reduceByKey(lambda x, y: map(add, x, y))

# [(u'https', [0, 1]), (u'fashionblog', [0, 1]), (u'dress', [0, 1]), (u'com', [0,


# ===>

# [(u'https', [100, 112]), (u'fashionblog', [0, 100]), (u'dress', [5, 15]), (u'com',

[95, 100])),...]

With treeAggregateaggregates = tokencounter.treeAggregate(WordFrequencyAgreggator(), lambda x,y:x.add

(y), lambda x,y: x.merge(y), depth=4)


Naive Bayes in Spark: treeAggregate

Page 23: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

On 1B short documents:

RDD.reduceByKey: 18 min

RDD.treeAggregate: 10 min


treeAggregate performance

Page 24: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark



Page 25: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


Training Word2Vec in Spark

from pyspark.mllib.feature import Word2Vec

inp = sc.textFile("text8_lines").map(lambda row: row.split(" "))

word2vec = Word2Vec()

model =

Page 26: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

AveragingClusteringConvolutional Neural Network


How to use word2vec vectors for classification problems

Page 27: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


K-Means in Spark

from pyspark.mllib.clustering import KMeans, KMeansModel


vectors = line: array(

[float(x) for x in line.split('\t')[1:]])


clusters = KMeans.train(vectors, 50000, maxIterations=10,

runs=10, initializationMode="random")

clusters_b = sc.broadcast(clusters)

labels = x:clusters_b.value.predict(x))

Page 28: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


Semi Supervised Naive Bayes

● Build an initial naive Bayes classifier, ŵ, from the labeled documents, X, only● Loop while classifier parameters improve:

○ (E-step) Use the current classifier, ŵ, to estimate component membership of each unlabeled document, i.e., the probability that each class generated each document,

○ (M-step) Re-estimate the classifier, ŵ, given the estimated class membership of each document.

Kamal Nigam, Andrew McCallum and Tom Mitchell. Semi-supervised Text Classification Using EM. In Chapelle, O., Zien, A., and Scholkopf, B. (Eds.) Semi-Supervised Learning. MIT Press: Boston. 2006.

Page 29: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

instead of labels:

tokencounter = (label, token): (token, (label, not label)))

# [(u'https', [0, 1]), (u'fashionblog', [0, 1]), (u'dress', [0, 1]), (u'com', [0,


use probabilities:

# [(u'https', [0.1, 0.3]), (u'fashionblog', [0.01, .11]), (u'dress', [0.02, 0.02]),

(u'com', [0.13, .05])),...]


Naive Bayes in Spark: EM

Page 30: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

500K labelled examplesPrecision: 0.27Recall: 0.15F1: 0.099

Add 10M unlabelled examples. 10 EM iterations.Precision of 0.26Recall of 0.31F1 of 0.14


Naive Bayes in Spark: EM

Page 31: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

240M training examplesPrecision: 0.31Recall: 0.19F1: 0.12

Add 250M unlabelled examples. 10 EM iterations.Precision of 0.26 and Recall of 0.22F1: 0.12


Naive Bayes in Spark: EM

Page 32: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

PySpark Memory: worked example

Page 33: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark


PySpark Configuration: Worked Example

10 x r3.4xlarge (122G, 16 cores)

Use half for each executor: 60GB

Number of cores = 120

OS: ~12GB

Each python process: ~4GB = 48GB

Cache = 60% x 60GB x 10 = 360GB

Each java thread: 40% x 60GB / 12 = ~2GB

more here:

Page 35: NLP on a Billion Documents: Scalable Machine Learning with Apache Spark

Top Related