What is Flink § Large-scale data processing engine
§ Easy and powerful APIs for batch and real-time streaming analysis (Java / Scala)
§ Backed by a very robust execution backend • with true streaming capabilities, • custom memory manager, • native iteration execution, • and a cost-based optimizer.
2
Technology inside Flink § Technology inspired by compilers +
MPP databases + distributed systems § For ease of use, reliable performance,
and scalability
case class Path (from: Long, to: Long) val tc = edges.iterate(10) { paths: DataSet[Path] => val next = paths .join(edges) .where("to") .equalTo("from") { (path, edge) => Path(path.from, edge.to) } .union(paths) .distinct() next }
Cost-based optimizer
Type extraction stack
Memory manager
Out-of-core algos
real-time streaming Task
scheduling
Recovery metadata
Data serialization
stack
Streaming network
stack
...
Pre-flight (client) Master
Workers
How do you use Flink?
4
Example: WordCount
5
case class Word (word: String, frequency: Int) val env = ExecutionEnvironment.getExecutionEnvironment() val lines = env.readTextFile(...) lines .flatMap {line => line.split(" ").map(word => Word(word,1))} .groupBy("word").sum("frequency”) .print() env.execute()
Flink has mirrored Java and Scala APIs that offer the same functionality, including by-name addressing.
Flink API in a Nutshell
§ map, flatMap, filter, groupBy, reduce, reduceGroup, aggregate, join, coGroup, cross, project, distinct, union, iterate, iterateDelta, ...
§ All Hadoop input formats are supported
§ API similar for data sets and data streams with slightly different operator semantics
§ Window functions for data streams
§ Counters, accumulators, and broadcast variables
6
Machine learning with Flink
7
Does ML work like that?
8
More realistic scenario!
9
Machine learning pipelines § Pipelining inspired by scikit-learn § Transformer: Modify data § Learner: Train a model § Reusable components § Let’s you quickly build ML pipelines § Model inherits pipeline of learner
10
Linear regression in polynomial space
val polynomialBase = PolynomialBase() val learner = MultipleLinearRegression() val pipeline = polynomialBase.chain(learner) val trainingDS = env.fromCollection(trainingData) val parameters = ParameterMap() .add(PolynomialBase.Degree, 3) .add(MultipleLinearRegression.Stepsize, 0.002) .add(MultipleLinearRegression.Iterations, 100) val model = pipeline.fit(trainingDS, parameters)
11
Input Data Polynomial
Base Mapper
Mul4ple Linear
Regression
Linear Model
Current state of Flink-ML § Existing learners • Multiple linear regression • Alternating least squares • Communication efficient distributed dual
coordinate ascent (PR pending) § Feature transformer • Polynomial base feature mapper
§ Tooling
12
Distributed linear algebra
§ Linear algebra universal language for data analysis
§ High-level abstraction § Fast prototyping § Pre- and post-processing
step
13
Example: Gaussian non-negative matrix factorization
§ Given input matrix V, find W and H such that
§ Iterative approximation
14
Ht+1 = Ht ∗ WtTV /Wt
TWtHt( )Wt+1 =Wt ∗ VHt+1
T /WtHt+1Ht+1T( )
V ≈WH
var i = 0 var H: CheckpointedDrm[Int] = randomMatrix(k, V.numCols) var W: CheckpointedDrm[Int] = randomMatrix(V.numRows, k) while(i < maxIterations) { H = H * (W.t %*% V / W.t %*% W %*% H) W = W * (V %*% H.t / W %*% H %*% H.t) i += 1 }
Why is Flink a good fit for ML?
15
Flink’s features § Stateful iterations • Keep state across iterations
§ Delta iterations • Limit computation to elements which matter
§ Pipelining • Avoiding materialization of large
intermediate state
16
CoCoA
17
minw∈Rd
P(w) := λ2w 2
+1nℓ i w
T xi( )i=1
n
∑#
$%
&
'(
Bulk Iterations
18
partial solution
partial solution X
other datasets
Y initial
solution iteration
result
Replace
Step function
Delta iterations
19
partial solution
delta set X
other datasets
Y initial solution
iteration result
workset A B workset
Merge deltas
Replace
initial workset
Effect of delta iterations
0
5000000
10000000
15000000
20000000
25000000
30000000
35000000
40000000
45000000
1 6 11 16 21 26 31 36 41 46 51 56 61
# of
ele
men
ts u
pdat
ed
iteration
Iteration performance
21
0
10
20
30
40
50
60
Hadoop Flink bulk Flink delta
Tim
e (m
inut
es)
61 iterations and 30 iterations of PageRank on a Twitter follower graph with Hadoop MapReduce and Flink using bulk and delta iterations
30 iterations
61 iterations
MapReduce
How to factorize really large matrices?
22
Collaborative Filtering § Recommend items based on users with
similar preferences § Latent factor models capture underlying
characteristics of items and preferences of user
§ Predicted preference:
23
r̂u,i = xuT yi
Matrix factorization
24
minX,Y ru,i − xuT yi( )
2+λ nu xu
2+ ni yi
2
i∑
u∑#
$%
&
'(
ru,i≠0∑
R ≈ XTY
R
X
Y
Alternating least squares § Fixing one matrix gives a quadratic form
§ Solution guarantees to decrease overall cost function
§ To calculate , all rated item vectors and ratings are needed
25
xu = YSuY T +λnuΙ( )−1Yru
T
Siiu =
1 if ru,i ≠ 00 else
"#$
%$
xu
Data partitioning
26
Naïve ALS case class Rating(userID: Int, itemID: Int, rating: Double) case class ColumnVector(columnIndex: Int, vector: Array[Double]) val items: DataSet[ColumnVector] = _ val ratings: DataSet[Rating] = _ // Generate tuples of items with their ratings val uVA = items.join(ratings).where(0).equalTo(1) { (item, ratingEntry) => { val Rating(uID, _, rating) = ratingEntry (uID, rating, item.vector) } }
27
Naïve ALS contd. uVA.groupBy(0).reduceGroup { vectors => { var uID = -‐1 val matrix = FloatMatrix.zeros(factors, factors) val vector = FloatMatrix.zeros(factors) var n = 0 for((id, rating, v) <-‐ vectors) { uID = id vector += rating * v matrix += outerProduct(v , v) n += 1 } for(idx <-‐ 0 until factors) { matrix(idx, idx) += lambda * n } new ColumnVector(uID, Solve(matrix, vector)) } }
28
Problems of naïve ALS § Problem: • Item vectors are sent redundantly à High
network load § Solution: • Blocking of user and item vectors to share
common data • Avoids blown up intermediate state
29
Data partitioning
30
Performance comparison
31
• 40 node GCE cluster, highmem-‐8 • 10 ALS itera4on with 50 latent factors
Table 1
Million entries
Billion entries
highmem-8 highmem-8 highmem-16 Naive Join Naive Join h
80 0.08 232.488 3.8748 201.326 3.35543333333333
400 0.4 447.8855 7.46475833333333 658.609 10.9768166666667
1200 1.2 1222.8525 20.380875 1910.328 31.8388
4000 4 3799.404 63.3234 6263.355 104.38925
8000 8 8729.444 145.490733333333 19753.041 329.21735
28000 28 50352.835 839.213916666667 330
Run
time
in m
inut
es
0
225
450
675
900
Number of non-zero entries (billion)
0 7.5 15 22.5 30
Blocked ALS Blocked ALS highmem-16 Naive ALS
5.5h
14h
2.5h
1h
Table 2
Entries in billion Naive Join Naive Join Broadcast Broadcast
80 0.08 201.326 3.35543333333333 190.723 3.17871666666667
400 0.4 658.609 10.9768166666667 776.197 12.9366166666667
1200 1.2 1910.328 31.8388 1754.774 29.2462333333333
4000 4 6263.355 104.38925 4486.262 74.7710333333333
8000 8 19753.041 329.21735
Run
time
in m
inut
es
0
100
200
300
400
Number of non-zero entries (billion)
0 2 4 6 8
Naive Join Broadcast
Streaming machine learning
32
Why is streaming ML important?
§ Spam detection in mails § Patterns might change over time § Retraining of model necessary § Best solution: Online models
33
Applications
§ Spam detection § Recommendation § News feed
personalization § Credit card fraud
detection
34
Apache SAMOA § Scalable Advanced Massive Online Analysis
§ Distributed streaming machine learning framework
§ Incubation at the Apache Software Foundation
§ Runs on multiple streaming processing engines (S4, Storm, Samza)
§ Support for Flink is pending pull request
35
Supported algorithms
§ Classification: Vertical Hoeffding Tree
§ Clustering: CluStream § Regression: Adaptive
Model Rules § Frequent pattern mining:
PARMA
36
Closing
37
Flink-ML Outlook § Support more algorithms
§ Support for distributed linear algebra
§ Integration with streaming machine learning
§ Interactive programs and Zeppelin
38
flink.apache.org @ApacheFlink