stormnyc04-140501100434-phpapp02
DESCRIPTION
How to build data pipelinesTRANSCRIPT
-
Data Pipelines :
Improving on the Lambda Architecture
Brian ONeill, CTO [email protected] @boneill42
-
Talk Breakdown
29%
20% 31%
20%
Topics
(1) MoIvaIon
(2) Polyglot Persistence
(3) AnalyIcs
(4) Lambda Architecture
-
Health Market Science - Then
What we were.
-
Health Market Science - Now
IntersecIng Big Data w/ Healthcare
Were fixing healthcare!
Data Manage
ment
-
Data Pipelines
I/O
-
The Input From government, state boards, etc.
From the internet,
social data, networks / graphs
From third-parIes,
medical claims From customers,
expenses, sales data, beneficiary informaIon, quality scores
Data Pipeline
-
The Output
Script
Claims
Expense
SancIon
Address
Contact (phone, fax, etc.)
Drug
RepresentaIve Division
Expense ManagerTM
Provider Verification
MarketViewTM
Customer Feed(s)
Customer Master
Provider MasterFileTM
CredenIals
Agile MDM 1 billion claims
per year
OrganizaIon
PracIIoner
Referrals
-
Sounds easy
Except... Incomplete Capture No foreign keys Differing schemas Changing schemas ConflicIng informaIon Ad-hoc Analysis (is hard) Point-In-Time Retrieval
-
Golden Record
Master Data Management
Harvested
Government
Private
faddress F@t0
flicense F@t5
fsanction F@t1 fsanction F@t4
Schema Change!
-
Why?
?s Sales OperaIons MarkeIng OpImizaIon
-
Our MDM Pipeline
- Data Stewardship - Data ScienIsts - Business Analysts
IngesIon - SemanIc Tagging - StandardizaIon - Data Mapping
IncorporaIon - ConsolidaIon - EnumeraIon - AssociaIon
Insight - Search - Reports - AnalyIcs
Feeds (mulIple formats, changing over Ime)
API / FTP Web Interface
Dimensions Logic Rules
-
Our first Pipeline
+
-
Sweet!
Dirt Simple
Lightning Fast Highly Available
Scalable MulI-Datacenter (DR)
-
Not Sweet.
How do we query the data?
NoSQL Indexes? Do such things exist?
-
Rev. 1 Wide Rows! AOP
Triggers!
Data model to support your
queries.
9 7 32 74 99 12 42
$3.50 $7.00 $8.75 $1.00 $4.20 $3.17 $8.88
ONC : PA : 19460
DOh! What about ad hoc?
-
TransformaIon
Rev 2 ElasIc Search! AOP
Triggers!
DOh! What if ES fails? What about schema / type informaBon?
-
Rev 3 - Apache Storm!
-
Polyglot Persistence The Right Tool for the Job
Oracle is a registered trademark of Oracle CorporaIon and/or its affiliates. Other names may be trademarks of their respecIve owners.
-
Back to the Pipeline
Kaoa DW
Storm
C* ES Titan SQL
-
Design Principles
What we got: At-least-once processing Simple data flows
What we needed to account for: Replays
Idempotent OperaBons! Immutable Data!
-
Cassandra State (v0.4.0)
[email protected]:hmsonline/storm-cassandra.git
{tuple} (ks, cf, row, k:v[])
Storm Cassandra
-
Trident ElasIc Search (v0.3.1)
[email protected]:hmsonline/trident-elasIcsearch.git
{tuple} (idx, docid, k:v[])
Storm ElasIc Search
-
Storm Graph (v0.1.2)
Coming soon to... [email protected]:hmsonline/storm-graph.git
for (tuple : batch) (graph, tuple)
-
Storm JDBI (v0.1.14)
INTERNAL ONLY (so far)
Worth releasing?
{tuple} (JDBC Statement)
-
All good!
-
But...
What was the average amount for a medical claim associated with procedure X by zip code over the last five years?
- Hadoop (
-
Lets Pre-Compute It!
stream .groupBy(new Field(ICD9)) .groupBy(new Field(zip)) .aggregate(new Field(amount), new Average())
DOh! GroupBys. They set data in moBon!
-
Lesson Learned
hzps://github.com/nathanmarz/storm/wiki/Trident-API-Overview
If possible, avoid re-parIIoning operaIons!
(e.g. LOG.error!)
-
Why so hard?
DOh! 19 != 9
What we dont want: LOCKS!
Whats the alternaIve?
CONSENSUS!
-
Cassandra 2.0!
hzp://www.slideshare.net/planetcassandra/nyc-jonathan-ellis-keynote-cassandra-12-20 hzp://www.cs.cornell.edu/courses/CS6452/2012sp/papers/paxos-complex.pdf
-
CondiIonal Updates The alert reader will notice here that Paxos gives us the ability to agree on exactly one proposal. After one has been accepted, it will be returned to future leaders in the promise, and the new leader will have to re-propose it again.hzp://www.datastax.com/dev/blog/lightweight-transacIons-in-cassandra-2-0
UPDATE value=9 WHERE word=fox IF value=6
-
Love CQL
CondiIonal Updates
+ Batch Statements
+ CollecIons
= BADASS DATA MODELS
-
Announcing : Storm Cassandra CQL!
[email protected]:hmsonline/storm-cassandra-cql.git
{tuple} (CQL Statement)
Trident Batching =? CQL Batching
-
CassandraCqlState public void commit(Long txid) {
BatchStatement batch = new BatchStatement(Type.LOGGED); batch.addAll(this.statements); clientFactory.getSession().execute(batch); }
public void addStatement(Statement statement) { this.statements.add(statement); } public ResultSet execute(Statement statement){ return clientFactory.getSession().execute(statement); }
-
CassandraCqlStateUpdater
public void updateState(CassandraCqlState state, List tuples, TridentCollector collector) {
for (TridentTuple tuple : tuples) { Statement statement = this.mapper.map(tuple); state.addStatement(statement); } }
-
ExampleMapper public Statement map(List keys, Number value) {
Insert statement = QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME);statement.value(KEY_NAME, keys.get(0));statement.value(VALUE_NAME, value);return statement;
}
public Statement retrieve(List keys) {Select statement = QueryBuilder.select()
.column(KEY_NAME).column(VALUE_NAME)
.from(KEYSPACE_NAME, TABLE_NAME)
.where(QueryBuilder.eq(KEY_NAME, keys.get(0))); return statement;}
-
Incremental State! Collapse aggregaIon into the state object.
This allows the state object to aggregate with current state in a loop unIl success.
Uses Trident Batching to perform in-memory aggregaIon for the batch.
for (tuple : batch)state.aggregate(tuple);
while (failed?) {persisted_state = read(state)aggregate(in_memory_state, persisted_state)failed? = conditionally_update(state)
}
-
ParIIon 1
In-Memory AggregaIon by Key!
Key Value
fox 6
brown 3
ParIIon 2
Key Value
fox 3
lazy 72 C*
No More GroupBy!
-
To protect against replays Use parBBon + batch idenBfier(s) in your condiBonal update!
BatchId + parIIonIndex consistently represents the same data as long as: 1.Any reparIIoning you do is determinisIc (so parIIonBy is, but shuffle is not)
2.You're using a spout that replays the exact same batch each Ime (which is true of transacIonal spouts but not of opaque transacIonal spouts)
- Nathan Marz
-
The Lambda Architecture
hzp://architects.dzone.com/arIcles/nathan-marzs-lamda
-
Lets Challenge This a Bit
because addiIonal tools and techniques cost money and Ime.
QuesIons: Can we solve the problem with a single tool and a single approach?
Can we re-use logic across layers? Or bezer yet, can we collapse layers?
-
A TradiIonal InterpretaIon
Speed Layer (Storm)
Batch Layer (Hadoop)
Data Stream
Serving Layer
HBase
Impala
DOh! Two pipelines!
-
IntegraIng Web Services
We need a web service that receives an event and provides, an immediate acknowledgement a high likelihood that the data is integrated very soon a guarantee that the data will be integrated eventually
We need an architecture that provides for, Code / Logic and approach re-use Fault-Tolerance
-
Grand Finale
-
The Idea : Embedding State!
Kaoa DropWizard
C*
IncrementalCqlState aggregate(tuple)
Batch Layer (Storm)
Client
-
The Sequence of Events
-
The Wins
Reuse AggregaIons and State Code! To re-compute (or backfill) a dimension, simply re-queue!
Storm is the safety net If a DW host fails during aggregaIon, Storm will fill in the gaps for all ACKd events.
Is there an opportunity to reuse more? BatchingStrategy & ParIIonStrategy?
-
In the end, all good. =)
-
Plug
The Book
Shout out: Taylor Goetz
-
Thanks
Brian ONeill, CTO [email protected] @boneill42