stormnyc04-140501100434-phpapp02

51
Data Pipelines : Improving on the Lambda Architecture Brian O’Neill, CTO [email protected] @boneill42

Upload: clungaho7109

Post on 03-Oct-2015

213 views

Category:

Documents


0 download

DESCRIPTION

How to build data pipelines

TRANSCRIPT

  • Data Pipelines :

    Improving on the Lambda Architecture

    Brian ONeill, CTO [email protected] @boneill42

  • Talk Breakdown

    29%

    20% 31%

    20%

    Topics

    (1) MoIvaIon

    (2) Polyglot Persistence

    (3) AnalyIcs

    (4) Lambda Architecture

  • Health Market Science - Then

    What we were.

  • Health Market Science - Now

    IntersecIng Big Data w/ Healthcare

    Were fixing healthcare!

    Data Manage

    ment

  • Data Pipelines

    I/O

  • The Input From government, state boards, etc.

    From the internet,

    social data, networks / graphs

    From third-parIes,

    medical claims From customers,

    expenses, sales data, beneficiary informaIon, quality scores

    Data Pipeline

  • The Output

    Script

    Claims

    Expense

    SancIon

    Address

    Contact (phone, fax, etc.)

    Drug

    RepresentaIve Division

    Expense ManagerTM

    Provider Verification

    MarketViewTM

    Customer Feed(s)

    Customer Master

    Provider MasterFileTM

    CredenIals

    Agile MDM 1 billion claims

    per year

    OrganizaIon

    PracIIoner

    Referrals

  • Sounds easy

    Except... Incomplete Capture No foreign keys Differing schemas Changing schemas ConflicIng informaIon Ad-hoc Analysis (is hard) Point-In-Time Retrieval

  • Golden Record

    Master Data Management

    Harvested

    Government

    Private

    faddress F@t0

    flicense F@t5

    fsanction F@t1 fsanction F@t4

    Schema Change!

  • Why?

    ?s Sales OperaIons MarkeIng OpImizaIon

  • Our MDM Pipeline

    - Data Stewardship - Data ScienIsts - Business Analysts

    IngesIon - SemanIc Tagging - StandardizaIon - Data Mapping

    IncorporaIon - ConsolidaIon - EnumeraIon - AssociaIon

    Insight - Search - Reports - AnalyIcs

    Feeds (mulIple formats, changing over Ime)

    API / FTP Web Interface

    Dimensions Logic Rules

  • Our first Pipeline

    +

  • Sweet!

    Dirt Simple

    Lightning Fast Highly Available

    Scalable MulI-Datacenter (DR)

  • Not Sweet.

    How do we query the data?

    NoSQL Indexes? Do such things exist?

  • Rev. 1 Wide Rows! AOP

    Triggers!

    Data model to support your

    queries.

    9 7 32 74 99 12 42

    $3.50 $7.00 $8.75 $1.00 $4.20 $3.17 $8.88

    ONC : PA : 19460

    DOh! What about ad hoc?

  • TransformaIon

    Rev 2 ElasIc Search! AOP

    Triggers!

    DOh! What if ES fails? What about schema / type informaBon?

  • Rev 3 - Apache Storm!

  • Polyglot Persistence The Right Tool for the Job

    Oracle is a registered trademark of Oracle CorporaIon and/or its affiliates. Other names may be trademarks of their respecIve owners.

  • Back to the Pipeline

    Kaoa DW

    Storm

    C* ES Titan SQL

  • Design Principles

    What we got: At-least-once processing Simple data flows

    What we needed to account for: Replays

    Idempotent OperaBons! Immutable Data!

  • Cassandra State (v0.4.0)

    [email protected]:hmsonline/storm-cassandra.git

    {tuple} (ks, cf, row, k:v[])

    Storm Cassandra

  • Trident ElasIc Search (v0.3.1)

    [email protected]:hmsonline/trident-elasIcsearch.git

    {tuple} (idx, docid, k:v[])

    Storm ElasIc Search

  • Storm Graph (v0.1.2)

    Coming soon to... [email protected]:hmsonline/storm-graph.git

    for (tuple : batch) (graph, tuple)

  • Storm JDBI (v0.1.14)

    INTERNAL ONLY (so far)

    Worth releasing?

    {tuple} (JDBC Statement)

  • All good!

  • But...

    What was the average amount for a medical claim associated with procedure X by zip code over the last five years?

  • Hadoop (
  • Lets Pre-Compute It!

    stream .groupBy(new Field(ICD9)) .groupBy(new Field(zip)) .aggregate(new Field(amount), new Average())

    DOh! GroupBys. They set data in moBon!

  • Lesson Learned

    hzps://github.com/nathanmarz/storm/wiki/Trident-API-Overview

    If possible, avoid re-parIIoning operaIons!

    (e.g. LOG.error!)

  • Why so hard?

    DOh! 19 != 9

    What we dont want: LOCKS!

    Whats the alternaIve?

    CONSENSUS!

  • Cassandra 2.0!

    hzp://www.slideshare.net/planetcassandra/nyc-jonathan-ellis-keynote-cassandra-12-20 hzp://www.cs.cornell.edu/courses/CS6452/2012sp/papers/paxos-complex.pdf

  • CondiIonal Updates The alert reader will notice here that Paxos gives us the ability to agree on exactly one proposal. After one has been accepted, it will be returned to future leaders in the promise, and the new leader will have to re-propose it again.hzp://www.datastax.com/dev/blog/lightweight-transacIons-in-cassandra-2-0

    UPDATE value=9 WHERE word=fox IF value=6

  • Love CQL

    CondiIonal Updates

    + Batch Statements

    + CollecIons

    = BADASS DATA MODELS

  • Announcing : Storm Cassandra CQL!

    [email protected]:hmsonline/storm-cassandra-cql.git

    {tuple} (CQL Statement)

    Trident Batching =? CQL Batching

  • CassandraCqlState public void commit(Long txid) {

    BatchStatement batch = new BatchStatement(Type.LOGGED); batch.addAll(this.statements); clientFactory.getSession().execute(batch); }

    public void addStatement(Statement statement) { this.statements.add(statement); } public ResultSet execute(Statement statement){ return clientFactory.getSession().execute(statement); }

  • CassandraCqlStateUpdater

    public void updateState(CassandraCqlState state, List tuples, TridentCollector collector) {

    for (TridentTuple tuple : tuples) { Statement statement = this.mapper.map(tuple); state.addStatement(statement); } }

  • ExampleMapper public Statement map(List keys, Number value) {

    Insert statement = QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME);statement.value(KEY_NAME, keys.get(0));statement.value(VALUE_NAME, value);return statement;

    }

    public Statement retrieve(List keys) {Select statement = QueryBuilder.select()

    .column(KEY_NAME).column(VALUE_NAME)

    .from(KEYSPACE_NAME, TABLE_NAME)

    .where(QueryBuilder.eq(KEY_NAME, keys.get(0))); return statement;}

  • Incremental State! Collapse aggregaIon into the state object.

    This allows the state object to aggregate with current state in a loop unIl success.

    Uses Trident Batching to perform in-memory aggregaIon for the batch.

    for (tuple : batch)state.aggregate(tuple);

    while (failed?) {persisted_state = read(state)aggregate(in_memory_state, persisted_state)failed? = conditionally_update(state)

    }

  • ParIIon 1

    In-Memory AggregaIon by Key!

    Key Value

    fox 6

    brown 3

    ParIIon 2

    Key Value

    fox 3

    lazy 72 C*

    No More GroupBy!

  • To protect against replays Use parBBon + batch idenBfier(s) in your condiBonal update!

    BatchId + parIIonIndex consistently represents the same data as long as: 1.Any reparIIoning you do is determinisIc (so parIIonBy is, but shuffle is not)

    2.You're using a spout that replays the exact same batch each Ime (which is true of transacIonal spouts but not of opaque transacIonal spouts)

    - Nathan Marz

  • The Lambda Architecture

    hzp://architects.dzone.com/arIcles/nathan-marzs-lamda

  • Lets Challenge This a Bit

    because addiIonal tools and techniques cost money and Ime.

    QuesIons: Can we solve the problem with a single tool and a single approach?

    Can we re-use logic across layers? Or bezer yet, can we collapse layers?

  • A TradiIonal InterpretaIon

    Speed Layer (Storm)

    Batch Layer (Hadoop)

    Data Stream

    Serving Layer

    HBase

    Impala

    DOh! Two pipelines!

  • IntegraIng Web Services

    We need a web service that receives an event and provides, an immediate acknowledgement a high likelihood that the data is integrated very soon a guarantee that the data will be integrated eventually

    We need an architecture that provides for, Code / Logic and approach re-use Fault-Tolerance

  • Grand Finale

  • The Idea : Embedding State!

    Kaoa DropWizard

    C*

    IncrementalCqlState aggregate(tuple)

    Batch Layer (Storm)

    Client

  • The Sequence of Events

  • The Wins

    Reuse AggregaIons and State Code! To re-compute (or backfill) a dimension, simply re-queue!

    Storm is the safety net If a DW host fails during aggregaIon, Storm will fill in the gaps for all ACKd events.

    Is there an opportunity to reuse more? BatchingStrategy & ParIIonStrategy?

  • In the end, all good. =)

  • Plug

    The Book

    Shout out: Taylor Goetz

  • Thanks

    Brian ONeill, CTO [email protected] @boneill42