contributions to large-scale data processing systems - phd ...phd defense matthieu caneill february...
TRANSCRIPT
Contributions toLarge-Scale Data Processing Systems
PhD Defense
Matthieu Caneill
February 5, 2018
Daniel HagimontINP ToulouseENSEEIHT
Jean-MarcMenaud
IMT Atlantique
Sihem Amer-YahiaCNRS / Universite
Grenoble Alpes
Noel De PalmaUniversite
Grenoble Alpes
Motivation
Worldwide data production
2008 2010 2012 2014 2016 2018
0
10
20
30
0.35.8
15
31 (est.)
zB
started PhD
1zetabyte = 1000exabytes = 106petabytes = 109terabytes
(1 zetabyte is 2 billion times my hard drive)
2 / 71
Motivation
Worldwide data production
2008 2010 2012 2014 2016 2018
0
10
20
30
0.35.8
15
31 (est.)
zB
started PhD
1zetabyte = 1000exabytes = 106petabytes = 109terabytes
(1 zetabyte is 2 billion times my hard drive)
2 / 71
Motivation
Worldwide data production
2008 2010 2012 2014 2016 2018
0
10
20
30
0.35.8
15
31 (est.)
zB
started PhD
1zetabyte = 1000exabytes = 106petabytes = 109terabytes
(1 zetabyte is 2 billion times my hard drive)
2 / 71
Motivation
Worldwide data production
2008 2010 2012 2014 2016 2018
0
10
20
30
0.35.8
15
31 (est.)
zB
started PhD
1zetabyte = 1000exabytes = 106petabytes = 109terabytes
(1 zetabyte is 2 billion times my hard drive)
2 / 71
Motivation
Worldwide data production
2008 2010 2012 2014 2016 2018
0
10
20
30
0.35.8
15
31 (est.)
zB
started PhD
1zetabyte = 1000exabytes = 106petabytes = 109terabytes
(1 zetabyte is 2 billion times my hard drive)
2 / 71
Motivation
Applications
I Genome sequencing and querying (human: 3 B base pairs)
I Web and social networks (Facebook: 600 TB/day in 2014)
I Particle physics (CERN: 1 PB/s of collision data)
I etc.
Problems
I Data management at scale
I Data processing in reasonable time
I . . . and reasonable price
3 / 71
Motivation
Applications
I Genome sequencing and querying (human: 3 B base pairs)
I Web and social networks (Facebook: 600 TB/day in 2014)
I Particle physics (CERN: 1 PB/s of collision data)
I etc.
Problems
I Data management at scale
I Data processing in reasonable time
I . . . and reasonable price
3 / 71
Motivation
Applications
I Genome sequencing and querying (human: 3 B base pairs)
I Web and social networks (Facebook: 600 TB/day in 2014)
I Particle physics (CERN: 1 PB/s of collision data)
I etc.
Problems
I Data management at scale
I Data processing in reasonable time
I . . . and reasonable price
3 / 71
Motivation
Applications
I Genome sequencing and querying (human: 3 B base pairs)
I Web and social networks (Facebook: 600 TB/day in 2014)
I Particle physics (CERN: 1 PB/s of collision data)
I etc.
Problems
I Data management at scale
I Data processing in reasonable time
I . . . and reasonable price
3 / 71
Motivation
Applications
I Genome sequencing and querying (human: 3 B base pairs)
I Web and social networks (Facebook: 600 TB/day in 2014)
I Particle physics (CERN: 1 PB/s of collision data)
I etc.
Problems
I Data management at scale
I Data processing in reasonable time
I . . . and reasonable price
3 / 71
Motivation
Applications
I Genome sequencing and querying (human: 3 B base pairs)
I Web and social networks (Facebook: 600 TB/day in 2014)
I Particle physics (CERN: 1 PB/s of collision data)
I etc.
Problems
I Data management at scale
I Data processing in reasonable time
I . . . and reasonable price
3 / 71
Research questions
How to design. . .
I An industrial system to handle monitoring data and makepredictions about future failures?
I An algorithm to improve locality in distributed streamingengines?
I A framework to compose data processing algorithms in adescriptive fashion, while reasoning on high level abstractions?
4 / 71
Research questions
How to design. . .
I An industrial system to handle monitoring data and makepredictions about future failures?
I An algorithm to improve locality in distributed streamingengines?
I A framework to compose data processing algorithms in adescriptive fashion, while reasoning on high level abstractions?
4 / 71
Research questions
How to design. . .
I An industrial system to handle monitoring data and makepredictions about future failures?
I An algorithm to improve locality in distributed streamingengines?
I A framework to compose data processing algorithms in adescriptive fashion, while reasoning on high level abstractions?
4 / 71
Outline
Structure of this presentation
1. Online metrics prediction in monitoring systems
2. Locality data routing
3. λ-blocks
4. Conclusion
5 / 71
Metrics prediction inmonitoring systems
How to design an industrial system to handle monitoring data andmake predictions about future failures?
Metrics prediction in monitoring systems
Actors and roles of Smart Support Center
I Coservit: Monitoring services
I HP: Cloud computing, hardware
I LIG – AMA: Machine learning
I LIG – ERODS: Cloud computing, systems
7 / 71
Metrics prediction in monitoring systems
Scope of Smart Support Center
monitoring data
machine learning system/cloud
I Monitoring insights
I Failure prediction
I Infrastructure scaling
I More server uptime
8 / 71
Metrics prediction in monitoring systems
Scope of Smart Support Center
monitoring data
machine learning
system/cloud
I Monitoring insights
I Failure prediction
I Infrastructure scaling
I More server uptime
8 / 71
Metrics prediction in monitoring systems
Scope of Smart Support Center
monitoring data
machine learning system/cloud
I Monitoring insights
I Failure prediction
I Infrastructure scaling
I More server uptime
8 / 71
Metrics prediction in monitoring systems
Scope of Smart Support Center
monitoring data
machine learning system/cloud
I Monitoring insights
I Failure prediction
I Infrastructure scaling
I More server uptime
8 / 71
Metrics prediction in monitoring systems
Scope of Smart Support Center
monitoring data
machine learning system/cloud
I Monitoring insights
I Failure prediction
I Infrastructure scaling
I More server uptime
8 / 71
Metrics prediction in monitoring systems
Challenges
I Scale monitoring infrastructure (from 1 to N nodes)
I System design for low latency analytics
I Fault tolerance
9 / 71
Metrics prediction in monitoring systems
Challenges
I Scale monitoring infrastructure (from 1 to N nodes)
I System design for low latency analytics
I Fault tolerance
9 / 71
Metrics prediction in monitoring systems
Challenges
I Scale monitoring infrastructure (from 1 to N nodes)
I System design for low latency analytics
I Fault tolerance
9 / 71
Metrics prediction in monitoring systems
Metrics
I Monitoring metric: observation point on a server in adatacenter
I CPU load, memory, service status
I Reported by agents, processed, and stored
I Computed as time-series
I Associated to thresholds: warning and critical
10 / 71
Metrics prediction in monitoring systems
Metrics
I Monitoring metric: observation point on a server in adatacenter
I CPU load, memory, service status
I Reported by agents, processed, and stored
I Computed as time-series
I Associated to thresholds: warning and critical
10 / 71
Metrics prediction in monitoring systems
Metrics
I Monitoring metric: observation point on a server in adatacenter
I CPU load, memory, service status
I Reported by agents, processed, and stored
I Computed as time-series
I Associated to thresholds: warning and critical
10 / 71
Metrics prediction in monitoring systems
Metrics
I Monitoring metric: observation point on a server in adatacenter
I CPU load, memory, service status
I Reported by agents, processed, and stored
I Computed as time-series
I Associated to thresholds: warning and critical
10 / 71
Metrics prediction in monitoring systems
Metrics
I Monitoring metric: observation point on a server in adatacenter
I CPU load, memory, service status
I Reported by agents, processed, and stored
I Computed as time-series
I Associated to thresholds: warning and critical
10 / 71
Metrics prediction in monitoring systems
Metrics behaviour: 6 scenarios
Value
Critical zone
Warning zone
Quick rise
Slow riseTransient rise
Perplexity pointSlow rise
Quick rise
Time
11 / 71
Metrics prediction in monitoring systems
Linear regression
time
valu
e
I Ability to identify localtrends (few hours)
I Fast to compute
I Good candidate to avoidfalse positives (peaks)
I Library: MLlib (part ofApache Spark)
12 / 71
Metrics prediction in monitoring systems
Linear regression
time
valu
e
I Ability to identify localtrends (few hours)
I Fast to compute
I Good candidate to avoidfalse positives (peaks)
I Library: MLlib (part ofApache Spark)
12 / 71
Metrics prediction in monitoring systems
Linear regression
time
valu
e
I Ability to identify localtrends (few hours)
I Fast to compute
I Good candidate to avoidfalse positives (peaks)
I Library: MLlib (part ofApache Spark)
12 / 71
Metrics prediction in monitoring systems
Linear regression
time
valu
e
I Ability to identify localtrends (few hours)
I Fast to compute
I Good candidate to avoidfalse positives (peaks)
I Library: MLlib (part ofApache Spark)
12 / 71
Metrics prediction in monitoring systems
Linear regression
time
valu
e
I Ability to identify localtrends (few hours)
I Fast to compute
I Good candidate to avoidfalse positives (peaks)
I Library: MLlib (part ofApache Spark)
12 / 71
Metrics prediction in monitoring systems
Linear regression
time
valu
e
I Ability to identify localtrends (few hours)
I Fast to compute
I Good candidate to avoidfalse positives (peaks)
I Library: MLlib (part ofApache Spark)
12 / 71
Metrics prediction in monitoring systems
System architecture
xx
x
Monitoring agents
Monitoringbroker
Cassandradatabase
Spark +MLlib
Alertmanager
GUI ...
13 / 71
Metrics prediction in monitoring systems
System architecture
xx
x
Monitoring agents
Monitoringbroker
Cassandradatabase
Spark +MLlib
Alertmanager
GUI ...
13 / 71
Metrics prediction in monitoring systems
System architecture
xx
x
Monitoring agents
Monitoringbroker
Cassandradatabase
Spark +MLlib
Alertmanager
GUI ...
13 / 71
Metrics prediction in monitoring systems
System architecture
xx
x
Monitoring agents
Monitoringbroker
Cassandradatabase
Spark +MLlib
Alertmanager
GUI ...
13 / 71
Metrics prediction in monitoring systems
System architecture
xx
x
Monitoring agents
Monitoringbroker
Cassandradatabase
Spark +MLlib
Alertmanager
GUI ...
13 / 71
Metrics prediction in monitoring systems
System architecture
Desired properties
I Scalable: up to a few servers (150 CPU cores) to handleCoservit’s load
I End-to-end fault tolerance: metrics can never be lost
I Performances: “fast” to compute metrics predictions
14 / 71
Metrics prediction in monitoring systems
System architecture
Desired properties
I Scalable: up to a few servers (150 CPU cores) to handleCoservit’s load
I End-to-end fault tolerance: metrics can never be lost
I Performances: “fast” to compute metrics predictions
14 / 71
Metrics prediction in monitoring systems
System architecture
Desired properties
I Scalable: up to a few servers (150 CPU cores) to handleCoservit’s load
I End-to-end fault tolerance: metrics can never be lost
I Performances: “fast” to compute metrics predictions
14 / 71
Metrics prediction in monitoring systems
Evaluation
Setup
I Hardware: 4 servers (16–28 cores, 128–256 GB RAM)
I Dataset: Replay on production data recorded at Coservit
I 424 206 metrics, 1.5 billion data points monitored on 25 070servers
15 / 71
Metrics prediction in monitoring systems
Evaluation
0 2 4 6 8 10
2.3
2.4
2.5
2.6
2.7
time (hours)
met
ric
valu
e
past
Figure: swap memory
16 / 71
Metrics prediction in monitoring systems
Evaluation
0 2 4 6 8 10
2.3
2.4
2.5
2.6
2.7
time (hours)
met
ric
valu
e
past predicted
prediction is computed
Figure: swap memory
16 / 71
Metrics prediction in monitoring systems
Evaluation
0 2 4 6 8 10
2.3
2.4
2.5
2.6
2.7
time (hours)
met
ric
valu
e
past predicted future
prediction is computed
Figure: swap memory
16 / 71
Metrics prediction in monitoring systems
Evaluation
0 2 4 6 8 102
2.1
2.2
2.3
2.4
time (hours)
met
ric
valu
e
past
Figure: physical memory
17 / 71
Metrics prediction in monitoring systems
Evaluation
0 2 4 6 8 102
2.1
2.2
2.3
2.4
time (hours)
met
ric
valu
e
past predicted
Figure: physical memory
17 / 71
Metrics prediction in monitoring systems
Evaluation
0 2 4 6 8 102
2.1
2.2
2.3
2.4
time (hours)
met
ric
valu
e
past predicted future
Figure: physical memory
17 / 71
Metrics prediction in monitoring systems
Evaluation
0 5 10 15 20688
690
692
694
696Warning
time (hours)
met
ric
valu
e
past
Figure: disk partition
18 / 71
Metrics prediction in monitoring systems
Evaluation
0 5 10 15 20688
690
692
694
696Warning
time (hours)
met
ric
valu
e
past predicted
raise alert: diskfull in 10mins
Figure: disk partition
18 / 71
Metrics prediction in monitoring systems
Evaluation
0 5 10 15 20688
690
692
694
696Warning
time (hours)
met
ric
valu
e
past predicted future
raise alert: diskfull in 10mins
Figure: disk partition
18 / 71
Metrics prediction in monitoring systems
Evaluation
Metric blacklisting
I Some metrics are too volatile and hard to predict
I To avoid false positives/negatives, and save resources, theyare blacklisted
I Root Mean Square Error evaluated weekly
I Metrics (temporarily) blacklisted if their RMSE > threshold
I 58.5% of the metrics have a low RMSE → good predictions
19 / 71
Metrics prediction in monitoring systems
Evaluation
Metric blacklisting
I Some metrics are too volatile and hard to predict
I To avoid false positives/negatives, and save resources, theyare blacklisted
I Root Mean Square Error evaluated weekly
I Metrics (temporarily) blacklisted if their RMSE > threshold
I 58.5% of the metrics have a low RMSE → good predictions
19 / 71
Metrics prediction in monitoring systems
Evaluation
Metric blacklisting
I Some metrics are too volatile and hard to predict
I To avoid false positives/negatives, and save resources, theyare blacklisted
I Root Mean Square Error evaluated weekly
I Metrics (temporarily) blacklisted if their RMSE > threshold
I 58.5% of the metrics have a low RMSE → good predictions
19 / 71
Metrics prediction in monitoring systems
Evaluation
Metric blacklisting
I Some metrics are too volatile and hard to predict
I To avoid false positives/negatives, and save resources, theyare blacklisted
I Root Mean Square Error evaluated weekly
I Metrics (temporarily) blacklisted if their RMSE > threshold
I 58.5% of the metrics have a low RMSE → good predictions
19 / 71
Metrics prediction in monitoring systems
Evaluation
Metric blacklisting
I Some metrics are too volatile and hard to predict
I To avoid false positives/negatives, and save resources, theyare blacklisted
I Root Mean Square Error evaluated weekly
I Metrics (temporarily) blacklisted if their RMSE > threshold
I 58.5% of the metrics have a low RMSE → good predictions
19 / 71
Metrics prediction in monitoring systems
Evaluation
CPU load and memory consumption
0 200 400 600 8000%
20%
40%
60%
80%
100%
120%
time (seconds)
CPU memory
(a) master
0 200 400 600 8000%
20%
40%
60%
80%
100%
120%
time (seconds)
CPU memory
(b) slave-1
Figure: Running on 4 machines and 100 cores for 15 minutes.20 / 71
Metrics prediction in monitoring systems
Evaluation
Time repartition
load createdataframe
train predictwtfsavewtfpublish0
100
200
300
400
500
tim
e(m
s)
Figure: Time repartition for predicting a metric.21 / 71
Metrics prediction in monitoring systems
Evaluation
Load handling
I End-to-end process for the prediction of 1 metric: 1 second.
I One monitoring server (with 24 cores) can handle the load of1440 metrics (at worst), which is 85 servers on average.
22 / 71
Metrics prediction in monitoring systems
Evaluation
Load handling
I End-to-end process for the prediction of 1 metric: 1 second.
I One monitoring server (with 24 cores) can handle the load of1440 metrics (at worst), which is 85 servers on average.
22 / 71
Metrics prediction in monitoring systems
Evaluation
Load handling: linear scaling
0 20 40 60 80 100 120 1400
20
40
60
80
100
120
CPU cores
pro
cess
edm
etri
cs(x
1000
) 1 slave
Figure: Amount of metrics handled in 15 minutes.23 / 71
Metrics prediction in monitoring systems
Evaluation
Load handling: linear scaling
0 20 40 60 80 100 120 1400
20
40
60
80
100
120
CPU cores
pro
cess
edm
etri
cs(x
1000
) 1 slave2 slaves
Figure: Amount of metrics handled in 15 minutes.23 / 71
Metrics prediction in monitoring systems
Evaluation
Load handling: linear scaling
0 20 40 60 80 100 120 1400
20
40
60
80
100
120
CPU cores
pro
cess
edm
etri
cs(x
1000
) 1 slave2 slaves3 slaves
Figure: Amount of metrics handled in 15 minutes.23 / 71
Metrics prediction in monitoring systems
Related work
Positioning
No published work exhibits the same system (end-to-end systemfor monitoring metrics prediction, storage and blacklisting).
Prediction models
I Hardware failures [CAS12]
I Capacity planning (e.g. Microsoft Azure [mic])
I Datacenter temperature (e.g. Thermocast [LLL+11])
I Monitoring metrics (e.g. Zabbix [zab] with manual tuning)
24 / 71
Metrics prediction in monitoring systems
Related work
Positioning
No published work exhibits the same system (end-to-end systemfor monitoring metrics prediction, storage and blacklisting).
Prediction models
I Hardware failures [CAS12]
I Capacity planning (e.g. Microsoft Azure [mic])
I Datacenter temperature (e.g. Thermocast [LLL+11])
I Monitoring metrics (e.g. Zabbix [zab] with manual tuning)
24 / 71
Metrics prediction in monitoring systems
Related work
Positioning
No published work exhibits the same system (end-to-end systemfor monitoring metrics prediction, storage and blacklisting).
Prediction models
I Hardware failures [CAS12]
I Capacity planning (e.g. Microsoft Azure [mic])
I Datacenter temperature (e.g. Thermocast [LLL+11])
I Monitoring metrics (e.g. Zabbix [zab] with manual tuning)
24 / 71
Metrics prediction in monitoring systems
Related work
Positioning
No published work exhibits the same system (end-to-end systemfor monitoring metrics prediction, storage and blacklisting).
Prediction models
I Hardware failures [CAS12]
I Capacity planning (e.g. Microsoft Azure [mic])
I Datacenter temperature (e.g. Thermocast [LLL+11])
I Monitoring metrics (e.g. Zabbix [zab] with manual tuning)
24 / 71
Locality data routing
How to design an algorithm to improve locality in distributedstreaming engines?
Locality data routing
ActorsCollaboration with Vincent Leroy (SLIDE)and Ahmed El-Rheddane (ERODS).
26 / 71
Locality data routing
Distributed streaming engines
Goals
I Real-time message handling
I Real-time metric calculations
I Parallelization
I Fault-tolerance
27 / 71
Locality data routing
Distributed streaming engines
Goals
I Real-time message handling
I Real-time metric calculations
I Parallelization
I Fault-tolerance
27 / 71
Locality data routing
Distributed streaming engines
Goals
I Real-time message handling
I Real-time metric calculations
I Parallelization
I Fault-tolerance
27 / 71
Locality data routing
Distributed streaming engines
Goals
I Real-time message handling
I Real-time metric calculations
I Parallelization
I Fault-tolerance
27 / 71
Locality data routing
Distributed streaming engines
Apache Storm → topologies.
SA
extract
Blower
Ccount
Figure: Trending hashtags topology.
S sends tweets, operator A extract hashtags, B converts them tolowercase, and C counts the frequency of each hashtag.
Division into tasks → distribution and parallelization made easy.
28 / 71
Locality data routing
Distributed streaming engines
Apache Storm → topologies.
SA
extract
Blower
Ccount
Figure: Trending hashtags topology.
S sends tweets, operator A extract hashtags, B converts them tolowercase, and C counts the frequency of each hashtag.
Division into tasks → distribution and parallelization made easy.
28 / 71
Locality data routing
Distributed streaming engines
Apache Storm → topologies.
SA
extract
Blower
Ccount
Figure: Trending hashtags topology.
S sends tweets, operator A extract hashtags, B converts them tolowercase, and C counts the frequency of each hashtag.
Division into tasks → distribution and parallelization made easy.
28 / 71
Locality data routing
Stateful operators
States are associated to keys
For example, the operator C can keep the list of trending hashtags(values) per location (keys).
SA
extract
Blower
Ccount
state...
29 / 71
Locality data routing
Stateful operators
ParallelizationTo keep a consistent state, same keys must be routed to the sameinstance.
S
A1
A2
B1
B2
C1
C2
C3
foofoo
bar
bar
Figure: Tasks A and B are stateless, C is stateful.
30 / 71
Locality data routing
SituationLet’s have two stateful operators, each with two instances.
Server 1
Server 2
S
A1
A2
B1
B2
GoalMinimize the traffic between themachines: A1→ B2 and A2→ B1.By default, locality = 1/parallelism
ConstraintKeep a good load balance betweenthe machines.
31 / 71
Locality data routing
SituationLet’s have two stateful operators, each with two instances.
Server 1
Server 2
S
A1
A2
B1
B2
GoalMinimize the traffic between themachines: A1→ B2 and A2→ B1.By default, locality = 1/parallelism
ConstraintKeep a good load balance betweenthe machines.
31 / 71
Locality data routing
SituationLet’s have two stateful operators, each with two instances.
Server 1
Server 2
S
A1
A2
B1
B2
GoalMinimize the traffic between themachines: A1→ B2 and A2→ B1.By default, locality = 1/parallelism
ConstraintKeep a good load balance betweenthe machines.
31 / 71
Locality data routing
Keys correlation
Dynamically instrument the keys couples and represent them witha bipartite graph.
A1
AfricaAsia
A2
Oceania
S
B1
#java
B2
#python#ruby
Asia7443
Oceania
5190
#java
4664
#ruby
3892
#python
4077
3463
3011969
1201
881
3108
Server 1
Server 2
Routing tables
I S : Asia → A1
Oceania → A2
I A1: #java → B1
#ruby → B1
#python → B2
I A2: #python → B2
#java → B1
#ruby → B1
Graph partitioning → optimized routing, favorizing local links.
32 / 71
Locality data routing
Keys correlation
Dynamically instrument the keys couples and represent them witha bipartite graph.
Asia7443
Oceania
5190
#java
4664
#ruby
3892
#python
4077
3463
3011969
1201
881
3108
Server 1
Server 2
Routing tables
I S : Asia → A1
Oceania → A2
I A1: #java → B1
#ruby → B1
#python → B2
I A2: #python → B2
#java → B1
#ruby → B1
Graph partitioning → optimized routing, favorizing local links.
32 / 71
Locality data routing
Keys correlation
Dynamically instrument the keys couples and represent them witha bipartite graph.
Asia7443
Oceania
5190
#java
4664
#ruby
3892
#python
4077
3463
3011969
1201
881
3108
Server 1
Server 2
Routing tables
I S : Asia → A1
Oceania → A2
I A1: #java → B1
#ruby → B1
#python → B2
I A2: #python → B2
#java → B1
#ruby → B1
Graph partitioning → optimized routing, favorizing local links.
32 / 71
Locality data routing
Keys correlation
Dynamically instrument the keys couples and represent them witha bipartite graph.
Asia7443
Oceania
5190
#java
4664
#ruby
3892
#python
4077
3463
3011969
1201
881
3108
Server 1
Server 2
Routing tables
I S : Asia → A1
Oceania → A2
I A1: #java → B1
#ruby → B1
#python → B2
I A2: #python → B2
#java → B1
#ruby → B1
Graph partitioning → optimized routing, favorizing local links.
32 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message:Posted from:
Skey route
Akey route
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message: #python doesn’t have bracesPosted from: Oceania
Skey route
Akey route
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message: #python doesn’t have bracesPosted from: Oceania
Skey routeOceania A1
Akey routepython B2
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message: #java is a verbose languagePosted from: Asia
Skey routeOceania A1
Akey routepython B2
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message: #java is a verbose languagePosted from: Asia
Skey routeOceania A1
Asia A2
Akey routepython B2
java B1
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message:Posted from:
Skey routeOceania A1
Asia A2
Akey routepython B2
java B1
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message:Posted from:
Skey routeOceania A1
Asia A2
Akey routepython B1
java B2
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message: #python is pretty cool!Posted from: Oceania
Skey routeOceania A1
Asia A2
Akey routepython B1
java B2
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Server 1
Server 2
S
A1
A2
B1
B2
Message: #python is pretty cool!Posted from: Oceania
Skey routeOceania A1
Asia A2
Akey routepython B1
java B2
Reconfiguration is computed and applied
Correlation between Oceania/python and Asia/java
33 / 71
Locality data routing
Trends evolve with timeCorrelations between keys change frequently.
2 3 4 5 6 7 8 9 10 11 12 130
50
100
150
200
250
300
350
400
time (days of March 2016)
freq
uen
cy(p
erd
ay)
VirginiaTexas
Florida
Figure: #nevertrump, in March 2016
34 / 71
Locality data routing
Trends evolve with timeCorrelations between keys change frequently.
2 3 4 5 6 7 8 9 10 11 12 130
50
100
150
200
250
300
350
400
time (days of March 2016)
freq
uen
cy(p
erd
ay)
VirginiaTexas
Florida
Figure: #nevertrump, in March 2016
34 / 71
Locality data routing
Locality decay
I Keys correlations evolve with time.
I Routing tables optimized by examining old data lead todecreased locality.
Reconfiguration
I We re-compute the tables every N minutes.
I Difficulty: keep the state consistent.
35 / 71
Locality data routing
Locality decay
I Keys correlations evolve with time.
I Routing tables optimized by examining old data lead todecreased locality.
Reconfiguration
I We re-compute the tables every N minutes.
I Difficulty: keep the state consistent.
35 / 71
Locality data routing
Locality decay
I Keys correlations evolve with time.
I Routing tables optimized by examining old data lead todecreased locality.
Reconfiguration
I We re-compute the tables every N minutes.
I Difficulty: keep the state consistent.
35 / 71
Locality data routing
Locality decay
I Keys correlations evolve with time.
I Routing tables optimized by examining old data lead todecreased locality.
Reconfiguration
I We re-compute the tables every N minutes.
I Difficulty: keep the state consistent.
35 / 71
Locality data routing
Reconfiguration protocol
Solution: online reconfiguration protocol
I update the routing tables in a live system
I without losing any message and state
36 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics
2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2
compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statistics
Partition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables
3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration
4○ Send ACK5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK
5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate
6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate6○ Transfer key states
Propagate to next operator
37 / 71
Locality data routing
Reconfiguration protocol
M A1 A2 B1 B2
1 1 1 1
2 2 2 2compute
routing
tables3 3 3 3
4 4 4 4
5 5
66
5 55
5
66
1○ Get statistics2○ Send statisticsPartition graph, compute routing tables
3○ Send reconfiguration4○ Send ACK5○ Propagate6○ Transfer key statesPropagate to next operator
37 / 71
Locality data routing
Evaluation
Datasets
I From Flickr and Twitter
I Fields: location (country or place), hashtag
I Size: 173M records (Flickr), 100M (Twitter)
Setup
I 8× 128 GB RAM, 20 cores.
I Computation of aggregated statistics (stateful workers).
I Parallelism (2..6), network speed (1Gb/s∣∣ 10Gb/s), message
size (0..20kB).
38 / 71
Locality data routing
Evaluation
Datasets
I From Flickr and Twitter
I Fields: location (country or place), hashtag
I Size: 173M records (Flickr), 100M (Twitter)
Setup
I 8× 128 GB RAM, 20 cores.
I Computation of aggregated statistics (stateful workers).
I Parallelism (2..6), network speed (1Gb/s∣∣ 10Gb/s), message
size (0..20kB).
38 / 71
Locality data routing
Evaluation
Great speed-up when network is the bottleneck.
Highly dependent on message size.
39 / 71
Locality data routing
Evaluation
Great speed-up when network is the bottleneck.
Highly dependent on message size.
39 / 71
Locality data routing
Evaluation – Flickr
Throughput (Ktuples/s) on 10Gb/s network, parallelism 6
0 5 10 15 20 25 300
100
200
300
400
500
time (minutes)
w/ reconfiguration
w/o reconfiguration
(a) message size=4kB
0 5 10 15 20 25 300
100
200
300
400
500
time (minutes)
w/ reconfiguration
w/o reconfiguration
(b) message size=8kB
40 / 71
Locality data routing
Evaluation – Flickr
Throughput (Ktuples/s) on 1Gb/s network, parallelism 6
0 5 10 15 20 25 300
100
200
300
400
500
time (minutes)
w/ reconfiguration
w/o reconfiguration
(a) message size=4kB
0 5 10 15 20 25 300
100
200
300
400
500
time (minutes)
w/ reconfiguration
w/o reconfiguration
(b) message size=8kB
41 / 71
Locality data routing
Evaluation – Flickr
Average throughput with 1Gb/s network, 4kB message size
2 3 4 5 60
50
100
150
200
250
300
350
400
parallelism
thro
ugh
pu
t(K
tup
les/
s) w/ reconfiguration
w/o reconfiguration
Figure: Average throughput, measured after the first reconfiguration.
42 / 71
Locality data routing
Evaluation – Flickr
Locality, with parallelism 6
0 5 10 15 20 250%
10%
20%
30%
40%
50%
60%
weeks
loca
lity
hash-based
43 / 71
Locality data routing
Evaluation – Flickr
Locality, with parallelism 6
0 5 10 15 20 250%
10%
20%
30%
40%
50%
60%
weeks
loca
lity
hash-based offline
43 / 71
Locality data routing
Evaluation – Flickr
Locality, with parallelism 6
0 5 10 15 20 250%
10%
20%
30%
40%
50%
60%
weeks
loca
lity
hash-based offline online
43 / 71
Locality data routing
Evaluation – Flickr
Locality when changing the number of collected keycorrelations
101 102 103 104 105 106 1070%
10%
20%
30%
40%
50%
60%
70%
80%
edges (logarithmic scale)
loca
lity
23456
44 / 71
Locality data routing
Related work
Scheduling: placement of operators on servers
I Using the topology [ABQ13]
I Using observed communication patterns [ABQ13]
I Using observed and/or estimated CPU and memorypatterns [FB15, PHH+15]
Load balancing: limit impact of data skew
I Partial key grouping [NMG+15]
I Special routing for frequent keys [RQA+15]
Co-location of correlated keys
I Databases partitions [CJZM10], social networks [BJJL13]
45 / 71
Locality data routing
Related work
Scheduling: placement of operators on servers
I Using the topology [ABQ13]
I Using observed communication patterns [ABQ13]
I Using observed and/or estimated CPU and memorypatterns [FB15, PHH+15]
Load balancing: limit impact of data skew
I Partial key grouping [NMG+15]
I Special routing for frequent keys [RQA+15]
Co-location of correlated keys
I Databases partitions [CJZM10], social networks [BJJL13]
45 / 71
Locality data routing
Related work
Scheduling: placement of operators on servers
I Using the topology [ABQ13]
I Using observed communication patterns [ABQ13]
I Using observed and/or estimated CPU and memorypatterns [FB15, PHH+15]
Load balancing: limit impact of data skew
I Partial key grouping [NMG+15]
I Special routing for frequent keys [RQA+15]
Co-location of correlated keys
I Databases partitions [CJZM10], social networks [BJJL13]
45 / 71
λ-blocks
How to design a framework to compose data processingalgorithms in a descriptive fashion, while reasoning on high level
abstractions?
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Design goals
I A data processing abstraction
I A graph of code blocks to represent an end-to-end processingsystem
I Separation of concerns: low-level data operations, high-leveldata processing programs
I Maximize reuse of code
I Compatible with existing (specialized) frameworks andpossibility to mix them
I Graph manipulation toolkit
I Bring simplicity to large-scale data processing
47 / 71
λ-blocks
Topologies
read file/etc/passwd
count
filtercontains: ’root’
48 / 71
λ-blocks
Topologies
read file/etc/passwd
count
filtercontains: ’root’
48 / 71
λ-blocks
Topologies
read file/etc/passwd
count
filtercontains: ’root’
48 / 71
λ-blocks
Topologies
"""Counts system users.
"""
def main():
with open('/etc/passwd') as f:
return len(f.readlines())
if __name__ == '__main__':
print(main())
$ wc -l /etc/passwd
49 / 71
λ-blocks
Topologies
"""Counts system users.
"""
def main():
with open('/etc/passwd') as f:
return len(f.readlines())
if __name__ == '__main__':
print(main())
$ wc -l /etc/passwd
49 / 71
λ-blocks
Topologies
"""Counts system users.
"""
def main():
with open('/etc/passwd') as f:
return len(f.readlines())
if __name__ == '__main__':
print(main())
$ wc -l /etc/passwd
49 / 71
λ-blocks
Topologies
---
name: count_users
description: Count number of system users
modules: [lb.blocks.foo]
---
- block: readfile
name: my_readfileargs :
filename: /etc/passwd
- block: count
name: my_count
inputs :
data: my_readfile.result
50 / 71
λ-blocks
BlocksI read http
I plot bars
I show console
I write line
I write lines
I split
I concatenate
I map list
I flatMap
I flatten list
I group by count
I sort
I get spark context
I spark readfile
I spark text to words
I spark map
I spark filter
I spark flatMap
I spark mapPartitions
I spark sample
I spark union
I spark intersection
I spark distinct
I spark groupByKey
I spark reduceByKey
I spark aggregateByKey
I spark sortByKey
I spark join
I spark cogroup
I spark cartesian
I spark pipe
I spark coalesce
I spark repartition
I spark reduce
I spark collect
I spark count
I spark first
I spark take
I spark takeSample
I spark takeOrdered
I spark saveAsTextFile
I spark countByKey
I spark foreach
I spark add
I spark swap
I twitter search
I cat
I grep
I cut
I head
I tail
51 / 71
λ-blocks
Blocks
@block(engine='localpython')
def take(n: int=0):
"""Truncates a list of integers.
:param int n: The length of the desired result.
:input List[int] data: The list of items to truncate.
:output List[int] result: The truncated result.
"""
def inner(data: List[int])->ReturnType[List[int]]:
assert n <= len(data)
return ReturnEntry(result=data[:n])
return inner
52 / 71
λ-blocks
Sub-topologies
readfile
filter
count
count pb
53 / 71
λ-blocks
Sub-topologies
readfile filter
count print
count pb
53 / 71
λ-blocks
Sub-topologies
---
name: count_pb
---
- block: filter
name: filter
args:
contains: error
inputs:
data: $inputs.data
- block: count
name: count
inputs:
data: filter.result
---
name: foo_errors
---
- block: readfile
name: readfile
args:
filename: foo.log
- topology : count_pb
name: count_pb
bind_in :
data: readfile.result
bind_out :
result: count.result
- block: print
name: print
inputs:
data: count_pb.result
54 / 71
λ-blocks
Sub-topologies
---
name: count_pb
---
- block: filter
name: filter
args:
contains: error
inputs:
data: $inputs.data
- block: count
name: count
inputs:
data: filter.result
---
name: foo_errors
---
- block: readfile
name: readfile
args:
filename: foo.log
- topology : count_pb
name: count_pb
bind_in :
data: readfile.result
bind_out :
result: count.result
- block: print
name: print
inputs:
data: count_pb.result54 / 71
λ-blocks
Architecture
xx
x
Block libraries
Blocksregistry
Graphengine
Topology
API, CLI
Graphplugins
x
55 / 71
λ-blocks
Architecture
xx
x
Block libraries
Blocksregistry
Graphengine
Topology
API, CLI
Graphplugins
x
55 / 71
λ-blocks
Architecture
xx
x
Block libraries
Blocksregistry
Graphengine
Topology
API, CLI
Graphplugins
x
55 / 71
λ-blocks
Architecture
xx
x
Block libraries
Blocksregistry
Graphengine
Topology
API, CLI
Graphplugins
x
55 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Verification (e.g. type checking)
I Instrumentation
I Caching
I Debugging tools
I Optimizations
I Monitoring
I Program reasoning and semantics
56 / 71
λ-blocks
Graph manipulations
I Reasoning on the computation graph as a high-level object
I Plugin systemI Hooks:
I before graph execution
pre-processing, optimizations, verificationsI after graph execution
post-processingI before block execution
observation, optimizationsI after block execution
observation
57 / 71
λ-blocks
Graph manipulations
I Reasoning on the computation graph as a high-level object
I Plugin system
I Hooks:I before graph execution
pre-processing, optimizations, verificationsI after graph execution
post-processingI before block execution
observation, optimizationsI after block execution
observation
57 / 71
λ-blocks
Graph manipulations
I Reasoning on the computation graph as a high-level object
I Plugin systemI Hooks:
I before graph execution
pre-processing, optimizations, verifications
I after graph execution
post-processingI before block execution
observation, optimizationsI after block execution
observation
57 / 71
λ-blocks
Graph manipulations
I Reasoning on the computation graph as a high-level object
I Plugin systemI Hooks:
I before graph execution
pre-processing, optimizations, verificationsI after graph execution
post-processing
I before block execution
observation, optimizationsI after block execution
observation
57 / 71
λ-blocks
Graph manipulations
I Reasoning on the computation graph as a high-level object
I Plugin systemI Hooks:
I before graph execution
pre-processing, optimizations, verificationsI after graph execution
post-processingI before block execution
observation, optimizations
I after block execution
observation
57 / 71
λ-blocks
Graph manipulations
I Reasoning on the computation graph as a high-level object
I Plugin systemI Hooks:
I before graph execution
pre-processing, optimizations, verificationsI after graph execution
post-processingI before block execution
observation, optimizationsI after block execution
observation
57 / 71
λ-blocks
Graph manipulation example: instrumentation (excerpt)
by_block = {} # timing by block: begin, duration
@before_block_execution
def store_begin_time(block):
name = block.fields['name']
by_block[name]['begin'] = time.time()
@after_block_execution
def store_end_time(block, results):
name = block.fields['name']
by_block[name]['duration'] = \
time.time() - by_block[name]['begin']
58 / 71
λ-blocks
Graph manipulation example: instrumentation (excerpt)
by_block = {} # timing by block: begin, duration
@before_block_execution
def store_begin_time(block):
name = block.fields['name']
by_block[name]['begin'] = time.time()
@after_block_execution
def store_end_time(block, results):
name = block.fields['name']
by_block[name]['duration'] = \
time.time() - by_block[name]['begin']
58 / 71
λ-blocks
Graph manipulation example: instrumentation (excerpt)
@after_graph_execution
def show_times(results):
longest_first = sorted(by_block, reverse=True)
for blockname in longest_first:
print('{}\t{}'.format(
blockname,
by_block[blockname]['duration'])
59 / 71
λ-blocks
Graph manipulation example: instrumentation
block duration (ms)read http 818write lines 54grep 49split 20
60 / 71
λ-blocks
Evaluation
Setup
I Wordcount over https: local machine, 8 cores, 16 GB RAM
I Wordcount over disk: local machine, 8 cores, 16 GB RAM
I PageRank on Spark: Spark on 1 server (24 cores, 128 GBRAM)
61 / 71
λ-blocks
Evaluation
Performances
LB LB+plugins Python
0.1
0.2
0.3
0.4
0.5
0.6
tim
e(s
)
real user sys
Figure: Wordcount over https: Twitter feed.
62 / 71
λ-blocks
Evaluation
Performances
LB LB+plugins Python
1
2
3
4
5
6
tim
e(s
)
real user sys
Figure: Wordcount over disk: Wikipedia dataset.
63 / 71
λ-blocks
Evaluation
Performances
LB LB+plugins Python0
200
400
600
800
tim
e(s
)
real user sys
Figure: PageRank on Wikipedia hyperlinks with Spark.
64 / 71
λ-blocks
Evaluation
Maximum overhead measured per topology: 50 ms
65 / 71
λ-blocks
Related work
Dataflow programming
I ML pipelines: scikit-learn [PVG+11], Spark [The17a], Orangeframework [DCE+13]
I Real-time: Apache Beam [apa], StreamPipes [RKHS15]
Blocks programming
I Recognition over recall, immediate feedback [BGK+17]
Graphs from configuration
I Pyleus [Yel16], Storm Flux [The17b]
Other
I “Serverless” architectures and stateless functions [JVSR17]
66 / 71
λ-blocks
Related work
Dataflow programming
I ML pipelines: scikit-learn [PVG+11], Spark [The17a], Orangeframework [DCE+13]
I Real-time: Apache Beam [apa], StreamPipes [RKHS15]
Blocks programming
I Recognition over recall, immediate feedback [BGK+17]
Graphs from configuration
I Pyleus [Yel16], Storm Flux [The17b]
Other
I “Serverless” architectures and stateless functions [JVSR17]
66 / 71
λ-blocks
Related work
Dataflow programming
I ML pipelines: scikit-learn [PVG+11], Spark [The17a], Orangeframework [DCE+13]
I Real-time: Apache Beam [apa], StreamPipes [RKHS15]
Blocks programming
I Recognition over recall, immediate feedback [BGK+17]
Graphs from configuration
I Pyleus [Yel16], Storm Flux [The17b]
Other
I “Serverless” architectures and stateless functions [JVSR17]
66 / 71
λ-blocks
Related work
Dataflow programming
I ML pipelines: scikit-learn [PVG+11], Spark [The17a], Orangeframework [DCE+13]
I Real-time: Apache Beam [apa], StreamPipes [RKHS15]
Blocks programming
I Recognition over recall, immediate feedback [BGK+17]
Graphs from configuration
I Pyleus [Yel16], Storm Flux [The17b]
Other
I “Serverless” architectures and stateless functions [JVSR17]
66 / 71
Conclusion
Context
Computer systems to process large quantities of data.
Problems: how to design. . .
I An industrial system to handle monitoring data and makepredictions about future failures?
I An algorithm to improve locality in distributed streamingengines?
I A framework to compose data processing algorithms in adescriptive fashion, while reasoning on high level abstractions?
67 / 71
Conclusion
Context
Computer systems to process large quantities of data.
Problems: how to design. . .
I An industrial system to handle monitoring data and makepredictions about future failures?
I An algorithm to improve locality in distributed streamingengines?
I A framework to compose data processing algorithms in adescriptive fashion, while reasoning on high level abstractions?
67 / 71
Conclusion
Contributions
Metricsprediction
Localityrouting
λ-blocks
What it is Industrialsystem
Online routinglibrary
Data processingabstraction
Layer End-to-end Low High
Improves Uptimes Throughput Programmability
68 / 71
Conclusion
Contributions
Metricsprediction
Localityrouting
λ-blocks
What it is Industrialsystem
Online routinglibrary
Data processingabstraction
Layer End-to-end Low High
Improves Uptimes Throughput Programmability
68 / 71
Conclusion
Contributions
Metricsprediction
Localityrouting
λ-blocks
What it is Industrialsystem
Online routinglibrary
Data processingabstraction
Layer End-to-end Low High
Improves Uptimes Throughput Programmability
68 / 71
Conclusion
Contributions
Metricsprediction
Localityrouting
λ-blocks
What it is Industrialsystem
Online routinglibrary
Data processingabstraction
Layer End-to-end Low High
Improves Uptimes Throughput Programmability
68 / 71
Conclusion
Future work
Metrics prediction in monitoring systems
I Predictions on long-term global trends
I Ticketing mechanism
Locality data routing
I Replace binary locality/non-locality with distance
I Smarter way to determine when to reschedule
I Extend to more complex topologies
69 / 71
Conclusion
Future work
Metrics prediction in monitoring systems
I Predictions on long-term global trends
I Ticketing mechanism
Locality data routing
I Replace binary locality/non-locality with distance
I Smarter way to determine when to reschedule
I Extend to more complex topologies
69 / 71
Conclusion
Future work
λ-blocks
I Explore more graph manipulation abstractions (complexityanalysis, serialization, verification. . . )
I Streaming and online operations
I Tight integration with clusters (data storage, caches, etc)
70 / 71
Thanks! Questions?
λ-blocks
Using a Spark cluster
λ-blocks
Sparkmaster slave-1 slave-2 slave-3
Block calling Spark
Normal block
1 / 14
λ-blocks
Signature algorithm
X
X
H(B) = h(B.name, block name (not instance name)
B.args, list of (name, value) tuples
B.inputs) list of (name, H(block), connector) tuples
2 / 14
λ-blocks
Evaluation
Engine instrumentation
0
0.2
0.4
0.6
0.8
1
(1) (2) (3) (4) (5) (6)
tim
e (s
)
all blocks + pluginsselected blocks + pluginsselected blocks
Figure: Wordcount program running under different setups.(1) Startup (modules import, etc); (2) Blocks registry creation, blockmodules import; (3) Plugin import; (4) YAML parsing and graphcreation; (5) Graph checks; (6) Graph execution.
3 / 14
Metrics prediction in monitoring systems
Database schema
metrics
metric id uuidmetric name textgroup id uuid
measurements
metric id uuidtimestamp intwarn textcrit textmax doublemin doublevalue doublemetric name textmetric unit text
predictions
metric id uuidtimestamp intpredicted values list
4 / 14
Images credits
I Data Center operators verifying network cable integrity,CC-BY-SA,https://commons.wikimedia.org/wiki/File:
Dc_cabling_50.jpg
I Tokyo metro map, http://bento.com/subtop5.html
I Goto e spaghetti code, http://blogbv2.altervista.org/HD/il-goto-e-la-buona-programmazione-parte-ii/
5 / 14
Bibliography I
Leonardo Aniello, Roberto Baldoni, and Leonardo Querzoni.Adaptive online scheduling in storm.In Proceedings of the 7th ACM International Conference onDistributed Event-based Systems, DEBS ’13, pages 207–218.ACM, 2013.
Apache Beam.https://beam.apache.org/.
David Bau, Jeff Gray, Caitlin Kelleher, Josh Sheldon, andFranklyn Turbak.Learnable programming: Blocks and beyond.Commun. ACM, 60(6):72–80, May 2017.
6 / 14
Bibliography II
Xiao Bai, Arnaud Jegou, Flavio Junqueira, and Vincent Leroy.Dynasore: Efficient in-memory store for social applications.In Middleware 2013 - ACM/IFIP/USENIX 14th InternationalMiddleware Conference, Beijing, China, December 9-13, 2013,Proceedings, pages 425–444, 2013.
T. Chalermarrewong, T. Achalakul, and S. C. W. See.Failure prediction of data centers using time series and faulttree analysis.In 2012 IEEE 18th International Conference on Parallel andDistributed Systems, pages 794–799, Dec 2012.
Carlo Curino, Evan Jones, Yang Zhang, and Sam Madden.Schism: A workload-driven approach to database replicationand partitioning.Proc. VLDB Endow., 3(1-2):48–57, September 2010.
7 / 14
Bibliography III
Janez Demsar, Tomaz Curk, Ales Erjavec, Crt Gorup, TomazHocevar, Mitar Milutinovic, Martin Mozina, Matija Polajnar,Marko Toplak, Anze Staric, Miha Stajdohar, Lan Umek, LanZagar, Jure Zbontar, Marinka Zitnik, and Blaz Zupan.Orange: Data mining toolbox in python.Journal of Machine Learning Research, 14:2349–2353, 2013.
Lorenz Fischer and Abraham Bernstein.Workload scheduling in distributed stream processors usinggraph partitioning.In 2015 IEEE International Conference on Big Data, Big Data2015, Santa Clara, CA, USA, October 29 - November 1, 2015,pages 124–133, 2015.
8 / 14
Bibliography IV
Eric Jonas, Shivaram Venkataraman, Ion Stoica, and BenjaminRecht.Occupy the cloud: Distributed computing for the 99%.arXiv preprint arXiv:1702.04024, 2017.
Lei Li, Chieh-Jan Mike Liang, Jie Liu, Suman Nath, AndreasTerzis, and Christos Faloutsos.Thermocast: A cyber-physical forecasting model fordatacenters.In Proceedings of the 17th ACM SIGKDD InternationalConference on Knowledge Discovery and Data Mining, KDD’11, pages 1370–1378, New York, NY, USA, 2011. ACM.
9 / 14
Bibliography V
Microsoft cloud azure.https://docs.microsoft.com/en-us/azure/
machine-learning/
machine-learning-algorithm-choice.
Muhammad Anis Uddin Nasir, Gianmarco De FrancisciMorales, David Garcıa-Soriano, Nicolas Kourtellis, and MarcoSerafini.The power of both choices: Practical load balancing fordistributed stream processing engines.In 31st IEEE International Conference on Data Engineering,ICDE, pages 137–148, 2015.
10 / 14
Bibliography VI
Boyang Peng, Mohammad Hosseini, Zhihao Hong, RezaFarivar, and Roy Campbell.R-storm: Resource-aware scheduling in storm.In Proceedings of the 16th Annual Middleware Conference,Middleware ’15, pages 149–161, New York, NY, USA, 2015.ACM.
F. Pedregosa, G. Varoquaux, A. Gramfort, V. Michel,B. Thirion, O. Grisel, M. Blondel, P. Prettenhofer, R. Weiss,V. Dubourg, J. Vanderplas, A. Passos, D. Cournapeau,M. Brucher, M. Perrot, and E. Duchesnay.Scikit-learn: Machine learning in Python.Journal of Machine Learning Research, 12:2825–2830, 2011.
11 / 14
Bibliography VII
Dominik Riemer, Florian Kaulfersch, Robin Hutmacher, andLjiljana Stojanovic.Streampipes: solving the challenge with semantic streamprocessing pipelines.In Proceedings of the 9th ACM International Conference onDistributed Event-Based Systems, pages 330–331. ACM, 2015.
Nicolo Rivetti, Leonardo Querzoni, Emmanuelle Anceaume,Yann Busnel, and Bruno Sericola.Efficient key grouping for near-optimal load balancing instream processing systems.In Proceedings of the 9th ACM International Conference onDistributed Event-Based Systems, DEBS ’15, pages 80–91,New York, NY, USA, 2015. ACM.
12 / 14
Bibliography VIII
The Apache Spark developers.ML Pipelines.https:
//spark.apache.org/docs/latest/ml-pipeline.html,2017.
The Apache Storm developers.Flux.http://storm.apache.org/releases/2.0.0-SNAPSHOT/
flux.html, 2017.
YelpArchive.Pyleus.https://github.com/YelpArchive/pyleus, 2016.
13 / 14
Bibliography IX
Zabbix prediction triggers.https://www.zabbix.com/documentation/3.0/manual/
config/triggers/prediction.
14 / 14