cloud native data pipelines (in eng & japanese) - qcon tokyo
TRANSCRIPT
Cloud Native Data Pipelines
Sid Anand QCon Shanghai & Tokyo 2016
1
Sid Anand QCon Shanghai & Tokyo 2016
2
Japanese Translation: Kiro Harada (@haradakiro)
About Me
3
Work [ed | s] @
Committer & PPMC on
Father of 2
Co-Chair for
Apache Airflow
4
[ | ] @
& PPMC Apache Airflow
5
Live Stream
6
Agari
7
What We Do!
Agari
8
!
Agari : What We Do
9
Agari :
10
11
Agari : What We Do
12
Agari :
13
Agari : What We Do
14
Agari :
15
Agari : What We Do
16
Agari :
17
Agari : What We Do
18
Agari :
19
Enterprise Customers
email metadata
apply trust
models
email md + trust score
Agari’s Previous EP Version
Agari : What We Do
Batch
20
+
Agari
Agari :
21
email metadata
apply trust
modelsemail md + trust score
Agari’s Current EP VersionEnterprise Customers
Agari : What We Do
Near-real time
Quarantine
22
+
Agari
Agari :
Data PipelinesBI vs Predictive
23
BI
24
Data Pipelines (BI)
25
WebServers
OLTPDB
DataWarehouse
Repor6ngTools
QueryBrowsers
ETL(batch)MySQL,Oracle,Cassandra
Terradata,RedShi;BigQuery
(BI)
26
WebServers
OLTPDB
DataWarehouse
Repor6ngTools
QueryBrowsers
ETL(batch)MySQL,Oracle,Cassandra
Terradata,RedShi;BigQuery
Data Pipelines (Predictive)
27
OLTPDBorcache
ETL(batchorstreaming)
MySQL,Oracle,Cassandra,Redis
Spark,Flink,Beam,Storm
WebServers
DataProductsRanking(Search,NewsFeed),RecommenderProducts,FraudDetecGon/PrevenGon
DataSource
(Predictive)
28
OLTPDBorcache
ETL(batchorstreaming)
MySQL,Oracle,Cassandra,Redis
Spark,Flink,Beam,Storm
WebServers
DataProductsRanking(Search,NewsFeed),RecommenderProducts,FraudDetecGon/PrevenGon
DataSource
Data Products
29
30
BI Predictive
Common Focus of this talk
Data Pipelines
31
WebServers
OLTPDB
DataWarehouse
Repor6ngTools
QueryBrowsers
ETL(batch)MySQL,Oracle,Cassandra
Terradata,RedShi;BigQuery
OLTPDBorcache
ETL(batchorstreaming)
MySQL,Oracle,Cassandra,Redis
Spark,Flink,Beam,Storm
WebServers
Ranking(Search,NewsFeed),RecommenderProducts,FraudDetecGon/PrevenGon
DataSource
BI Predictive
32
WebServers
OLTPDB
DataWarehouse
Repor6ngTools
QueryBrowsers
ETL(batch)MySQL,Oracle,Cassandra
Terradata,RedShi;BigQuery
OLTPDBorcache
ETL(batchorstreaming)
MySQL,Oracle,Cassandra,Redis
Spark,Flink,Beam,Storm
WebServers
Ranking(Search,NewsFeed),RecommenderProducts,FraudDetecGon/PrevenGon
DataSource
MotivationCloud Native Data Pipelines
33
Cloud Native Data Pipelines
34
Big Data Companies like LinkedIn, Facebook, Twitter, & Google build custom, large scale data pipelines that run in their own Data Centers
35
LinkedIn Facebook Twitter Google
Cloud Native Data Pipelines
36
Big Data Companies like LinkedIn, Facebook, Twitter, & Google build custom, large scale data pipelines that run in their own Data Centers
Most start-ups run in the public cloud. Can they leverage aspects of the public cloud to build comparable pipelines?
37
LinkedIn Facebook Twitter Google
Cloud Native Data Pipelines
38
Cloud Native Techniques
Open Source Technogies
Custom Data Pipeline Stacks seen in Big Data companies
~
39
~
Design GoalsDesirable Qualities of a Resilient Data Pipeline
40
41
42
Desirable Qualities of a Resilient Data Pipeline
OperabilityCorrectness
Timeliness Cost
43
44
Desirable Qualities of a Resilient Data Pipeline
OperabilityCorrectness
Timeliness Cost
• Data Integrity (no loss, etc…) • Expected data distributions
• All output within time-bound SLAs
• Fine-grained Monitoring & Alerting of Correctness & Timeliness SLAs
• Quick Recoverability
• Pay-as-you-go
45
• • ( …)
•
• SLA
•
SLA •
•
Quickly Recoverable
46
• Bugs happen!
• Bugs in Predictive Data Pipelines have a large blast radius
• Optimize for MTTR
47
• !
•
• MTTR
Predictive Analytics @ AgariUse Cases
48
Predictive Analytics @ Agari
49
Use Cases
50
Apply trust models (message scoring)
batch + near real time
Build trust models
batch
(Enterprise Protect)
51
(message scoring)
+
(Enterprise Protect)
Use-Case : Message Scoring (batch)Batch Pipeline Architecture
52
: (batch)
Batch Pipeline Architecture
53
Use-Case : Message Scoring
54
enterprise Aenterprise Benterprise C
S3
S3 uploads an Avro file every 15 minutes
Use-Case :
55
enterprise Aenterprise Benterprise C
S3
Avro 15S3
Use-Case : Message Scoring
56
enterprise Aenterprise Benterprise C
S3
Airflow kicks of a Spark message scoring job
every hour (EMR)
Use-Case :
57
enterprise Aenterprise Benterprise C
S3
Airflow Spark
(EMR)
Use-Case : Message Scoring
58
enterprise Aenterprise Benterprise C
S3
Spark job writes scored messages and stats to
another S3 bucket
S3
Use-Case :
59
enterprise Aenterprise Benterprise C
S3
Spark
S3
S3
Use-Case : Message Scoring
60
enterprise Aenterprise Benterprise C
S3
This triggers SNS/SQS messages events
S3
SNS
SQS
Use-Case :
61
enterprise Aenterprise Benterprise C
S3
SNS/SQS
S3
SNS
SQS
Use-Case : Message Scoring
62
enterprise Aenterprise Benterprise C
S3
An Autoscale Group (ASG) of Importers spins up when it detects SQS
messages
S3
SNS
SQS
Importers
ASG
Use-Case :
63
enterprise Aenterprise Benterprise C
S3
SQS
(ASG)
S3
SNS
SQS
Importers
ASG
64
enterprise Aenterprise Benterprise C
S3
The importers rapidly ingest scored messages and aggregate statistics into
the DB
S3
SNS
SQS
Importers
ASGDB
Use-Case : Message Scoring
65
enterprise Aenterprise Benterprise C
S3 S3
SNS
SQS
Importers
ASGDB
Use-Case :
66
enterprise Aenterprise Benterprise C
S3
Users receive alerts of untrusted emails & can review them in
the web app
S3
SNS
SQS
Importers
ASGDB
Use-Case : Message Scoring
67
enterprise Aenterprise Benterprise C
S3
WebApp
S3
SNS
SQS
Importers
ASGDB
Use-Case :
68
enterprise Aenterprise Benterprise C
S3 S3
SNS
SQS
Importers
ASGDB
Airflow manages the entire process
Use-Case : Message Scoring
69
enterprise Aenterprise Benterprise C
S3 S3
SNS
SQS
Importers
ASGDB
Airflow
Use-Case :
Tackling Cost & TimelinessLeveraging the AWS Cloud
70
AWS
71
Tackling Cost
72
Between Daily Runs During Daily Runs
When running daily, for 23 hours of a day, we didn’t pay for instances in the ASG or EMR
73
23ASG EMR
Tackling Cost
74
Between Hourly Runs During Hourly Runs
When running daily, for 23 hours of a day, we didn’t pay for instances in the ASG or EMR
This does not help when runs are hourly since AWS charges at an hourly rate for EC2 instances!
Tackling Cost
75
23 ASG EMR
AWS
Tackling TimelinessAuto Scaling Group (ASG)
76
(ASG)
77
ASG - Overview
78
What is it?
A means to automatically scale out/in clusters to handle variable load/traffic
A means to keep a cluster/service of a fixed size always up
ASG -
79
ASG
・
/
ASG - Data Pipeline
80
importer
importer
importer
importer
Importer ASG
scale out / inSQS
DB
ASG -
81
importer
importer
importer
importer
ASG
scale out / inSQS
DB
82
Sent
CPU
ACKd/Recvd
CPU-based auto-scaling is good at scaling in/out to keep the average CPU constant
ASG : CPU-based
83
Sent
CPU
ACKd/Recvd
CPU-CPU
ASG : CPU-
ASG : CPU-based
84
Sent
CPU
Recv
Premature Scale-in
Premature Scale-in:
• The CPU drops to noise-levels before all messages are consumed
• This causes scale in to occur while the last few messages are still being committed
ASG : CPU-
85
Sent
CPU
Recv
Premature Scale-in
:
• CPU
•
86
Scale-out: When Visible Messages > 0 (a.k.a. when queue depth > 0)
Scale-in: When Invisible Messages = 0 (a.k.a. when the last in-flight message is ACK’d)
This causes the ASG to grow
This causes the ASG to shrink
ASG : Queue-based
87
Scale-out: When Visible Messages > 0 (a.k.a. when queue depth > 0)
Scale-in: When Invisible Messages = 0 (a.k.a. when the last in-flight message is ACK’d)
This causes the ASG to grow
This causes the ASG to shrink
ASG : Queue-
88
ASG : Queue-based
Shoyu Koto Da!!!!
89
ASG : Queue-
Shoyu Koto Da!!!!
90
Desirable Qualities of a Resilient Data Pipeline
OperabilityCorrectness
Timeliness Cost• ASG • EMR Spark
Daily • ASG • EMR Spark Hourly ASG • No Cost Savings
91
• ASG • EMR Spark
• ASG • EMR Spark
ASG •
Tackling Operability & CorrectnessLeveraging Tooling
92
93
94
A simple way to author and manage workflows
Provides visual insight into the state & performance of workflow runs
Integrates with our alerting and monitoring tools
Tackling Operability : Requirements
95
ns
:
Apache AirflowWorkflow Automation & Scheduling
96
Apache Airflow
97
98
Airflow: Author DAGs in Python! No need to bundle many config files!
Apache Airflow - Authoring DAGs
99
Airflow: DAG Python !
Apache Airflow - DAG
100
Airflow: Visualizing a DAG
Apache Airflow - Authoring DAGs
101
Airflow: DAG
Apache Airflow - DAG
102
Airflow: It’s easy to manage multiple DAGs
Apache Airflow - Managing DAGs
103
Airflow: DAG
Apache Airflow - DAG
Apache Airflow - Perf. Insights
104
Airflow: Gantt chart view reveals the slowest tasks for a run!
Apache Airflow -
105
Airflow:
106
Apache Airflow - Perf. InsightsAirflow: Task Duration chart view show task completion time trends!
107
Apache Airflow - Airflow:
108
Airflow: …And easy to integrate with Ops tools!Apache Airflow - Alerting
109
Airflow: …And easy to integrate with Ops tools!Apache Airflow -
110
Apache Airflow - Correctness
111
Apache Airflow -
112
Desirable Qualities of a Resilient Data Pipeline
OperabilityCorrectness
Timeliness Cost
113
Use-Case : Message Scoring (near-real time)NRT Pipeline Architecture
114
:
( )NRT
115
Use-Case : Message Scoring
116
enterprise Aenterprise Benterprise C
Kinesis batch put every second
K
:
117
enterprise Aenterprise Benterprise C
Kinesis
K
Use-Case : Message Scoring
118
enterprise Aenterprise Benterprise C
K
As ASG of scorers is scaled up to one process per core per kinesis shard
Scorers
ASG
:
119
enterprise Aenterprise Benterprise C
K
ASGKinesis CPU
1
Scorers
ASG
Use-Case : Message Scoring
120
enterprise Aenterprise Benterprise C
KScorers
ASG
KinesisScorers apply the trust model and send scored messages downstream
:
121
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Use-Case : Message Scoring
122
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASG
As ASG of importers is scaled up to rapidly import messages
DB
:
123
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASG
ASG
DB
Use-Case : Message Scoring
124
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASG
Imported messages are also consumed by the
alerter
DB
K
Alerters
ASG
:
125
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASGDB
K
Alerters
ASG
Use-Case : Message Scoring
126
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASG
Imported messages are also consumed by the
alerter
DB
K
Alerters
ASG
Quarantine Email
:
127
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASGDB
K
Alerters
ASG
InnovationsNRT Pipeline Architecture
128
NRT
129
Apache AvroWhat is Avro?
130
Apache AvroAvro ?
131
132
What is Avro?
Avro is a self-describing serialization format that supports
primitive data types : int, long, boolean, float, string, bytes, etc…
complex data types : records, arrays, unions, maps, enums, etc…
many language bindings : Java, Scala, Python, Ruby, etc…
133
What is Avro?
Avro
: int, long, boolean, float, string, bytes, etc…
: records, arrays, unions, maps, enums, etc…
: Java, Scala, Python, Ruby, etc…
134
What is Avro?
Avro is a self-describing serialization format that supports
primitive data types : int, long, boolean, float, string, bytes, etc…
complex data types : records, arrays, unions, maps, enums, etc…
many language bindings : Java, Scala, Python, Ruby, etc…
The most common format for storing structured Big Data at rest in HDFS, S3, Google Cloud Storage, etc…
Supports Schema Evolution!
135
What is Avro?
Avro
: int, long, boolean, float, string, bytes, etc…
: records, arrays, unions, maps, enums, etc…
: Java, Scala, Python, Ruby, etc…
HDFS, S3, Google Cloud Storage
!
136
Avro Schema Example
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
137
Avro {"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
138
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
complex type (record)
Avro Schema Example
139
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
complex type (record)
Avro
140
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
complex type (record)Schema name : User
Avro Schema Example
141
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
complex type (record)Schema name : User
Avro
142
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
complex type (record)Schema name : User
3 fields in the record: 1 required, 2 optional
Avro Schema Example
143
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
complex type (record)Schema name : User
3 1 2
Avro
144
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Data
x 1,000,000,000
Avro Schema Data File Example
Schema
Data
0.0001 %
99.999 %
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
145
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Data
x 1,000,000,000
Avro
0.0001 %
99.999 %
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
Data
146
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Binary Data block
Avro Schema Streaming Example
Schema
Data
99 %
1 %
Data
147
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Binary Data block
Avro
99 %
1 %
Data
148
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Binary Data block
Avro Schema Streaming Example
Schema
Data
99 %
1 %
Data
OVERHEAD!!
149
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Binary Data block
Avro
99 %
1 %
Data
!!
150
Schema Registry
(Lambda)
Innovation 1 : Avro Schema Registry
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
register_schema
Message Producer (P)
151
(Lambda)
1 : Avro
{"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
register_schema
(P)
152
Schema Registry
(Lambda)
Innovation 1 : Avro Schema Registry
register_schema returns a UUID
Message Producer (P)
153
(Lambda)
1 : Avro
register_schema UUID
(P)
154
Schema Registry
(Lambda)
Innovation 1 : Avro Schema Registry
Message Producer sends UUID +
Message Producer (P)
Data
Message Consumer (C)
155
(Lambda)
1 : Avro
UUID +
(P)
Data
(C)
156
Schema Registry
(Lambda)
Innovation 1 : Avro Schema Registry
Message Producer (P)
Data
Message Consumer (C)
getSchemaById (UUID)
157
(Lambda)
1 : Avro
(P)
(C)
getSchemaById (UUID)
158
Schema Registry
(Lambda)
Innovation 1 : Avro Schema Registry
Message Producer (P)
Data
Message Consumer (C)
getSchemaById (UUID){"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
159
(Lambda)
1 : Avro
(P)
(C)
getSchemaById (UUID){"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
160
Schema Registry
(Lambda)
Innovation 1 : Avro Schema Registry
Message Producer (P)
Message Consumer (C)
getSchemaById (UUID){"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
Message Consumers • download & cache the schema
• then decode the data
161
(Lambda)
1 : Avro
(P)
(C)
getSchemaById (UUID){"namespace": "agari", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
• & •
162
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASG
Imported messages are also consumed by the
alerter
DB
K
Alerters
ASG
SR
SR
SR
Innovation 1 : Avro Schema Registry
163
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASG
alterer
DB
K
Alerters
ASG
SR
SR
SR
1 : Avro
164
The Architecture is composed of repeated patterns of :
ASG-based compute consumer
Kinesis transport streams (i.e. AWS’ managed “Kafka”)
A Lambda-based Avro Schema Registry
Innovation 2 : Repeatable Units
ComputeiKinesisi
ASGi
SR
165
You can chain these repeatable units together to make arbitrary DAGs (Directed Acyclic Graphs)
User Hashicorp’s Terraform to compose your DAG through automation
The example above is a simple Linear DAG with 3 units
Innovation 2 : Repeatable Units
ComputeiKinesisi
ASGi
SR
ComputeiKinesisi
ASGi
SR
ComputeiKinesisi
ASGi
SR
166
DAG( )
Hashicorp’s Terraform DAG
DAG
2 :
ComputeiKinesisi
ASGi
SR
ComputeiKinesisi
ASGi
SR
ComputeiKinesisi
ASGi
SR
Airflow Job Reactively Scales
Innovation 3 : Reactive-Scaling (WIP)
167
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASGDB
K
Alerters
ASG
SR
SR
SR
Airflow
3 :
168
enterprise Aenterprise Benterprise C
KScorers
ASG
Kinesis
Importers
ASGDB
K
Alerters
ASG
SR
SR
SR
169
If the ADR is triggered and a model build or code push was recently done to Compute 1, ADR will revert the last code or model push to ASG Compute 1
Innovation 4 : Anomaly-based Rollback (WIP)
ASG
Compute1 Compute2Kinesis
ASG
SR
Anomaly-detector&Reverter
170
ADR Compute 1ADR
Compute1
4 : (WIP)
ASG
Compute1 Compute2Kinesis
ASG
SR
Anomaly-detector&Reverter
Open Source Plans
171
Follow us to be notified when the following is open-sourced
• Avro Schema Registry
• Agari (Kinesis+ASG) scaling tool (Airflow Job)
• Anomaly-detector & Reverter
To be notified, follow @AgariEng & @r39132
172
• Avro Schema Registry
• Agari (Kinesis+ASG) scaling tool (Airflow Job)
• Anomaly-detector & Reverter
@AgariEng & @r39132
Acknowledgments
173
• Vidur Apparao • Stephen Cattaneo • Jon Chase • Andrew Flury • William Forrester • Chris Haag • Mike Jones
• Scot Kennedy • Thede Loder • Paul Lorence • Kevin Mandich • Gabriel Ortiz • Jacob Rideout • Josh Yang • Julian Mehnle
None of this work would be possible without the contributions of the strong team below
174
• Vidur Apparao • Stephen Cattaneo • Jon Chase • Andrew Flury • William Forrester • Chris Haag • Mike Jones
• Scot Kennedy • Thede Loder • Paul Lorence • Kevin Mandich • Gabriel Ortiz • Jacob Rideout • Josh Yang • Julian Mehnle
Questions? (@r39132)
175
? (@r39132)
176