end-to-end "exactly-once" processing using apache apex (next gen hadoop)

28
End to End Exactly Once in Apache Apex Ajay Gupta Software Engineer@DataTorrent & Apex Contributor

Upload: datatorrent

Post on 20-Mar-2017

192 views

Category:

Technology


0 download

TRANSCRIPT

Page 1: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

End to End Exactly Once in Apache Apex

Ajay GuptaSoftware

Engineer@DataTorrent & Apex Contributor

Page 2: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Agenda•Introduction to Apex

•Fault Tolerance in Apex•Checkpointing•Buffer Server

• Idempotency

• End to End Exactly once in Apache Apex

• Conclusion & Questions.

Page 3: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Apache Apex•In-memory, distributed stream processing

•Application logic broken into components called operators that run in a distributed fashion across your cluster

•Natural programming model•Unobtrusive Java API to express (custom) logic•Maintain state and metrics in your member variables

•Scalable, high throughput, low latency•Operators can be scaled up or down at runtime according to the load and SLA•Dynamic scaling (elasticity), compute locality

•Fault tolerance & correctness•Automatically recover from node outages without having to reprocess from beginning•State is preserved, checkpointing, incremental recovery•End-to-end exactly-once

•Operability•System and application metrics, record/visualize data•Dynamic changes

Page 4: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

4

• Big Data is neither Productized nor Operationalized• Total Cost of Ownership (TCO) includes

• Time to Develop + Time to Launch + Cost of ongoing Operations

• Provide a Product to ...• Build Applications Rapidly with Simple Interfaces, Pre-Built Apps, Code

Reuse & Debuggability• Support Dev, Test, Prod cycle to Launch Apps quickly• Manage and Visualize Applications for Operability

DataTorrent Vision - Productize Big Data

Page 5: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

5

Next Gen Big Data Applications

Browser

Web Server

Kafka Input(logs)

Decompress, Parse, Filter

Dimensions Aggregate Kafka

LogsKafka

Variety of sources - IoT, Kafka, files, social media etc.Variety of sinks – Kafka, files, databases etc.* Supports low latency real time visualizations as well

Unbounded and continuous data streamsBatch support, batch processed as stream

In-memory processing with temporal window boundaries

Stateful operations: Aggregation, Rules etc --> Analytics

Page 6: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

6

Big Data Ecosystem: Where DataTorrent fits

Data Sources Oper1 Oper2 Oper3

Hadoop (YARN + HDFS)

Sensor Data

Social Media

Web Servers

App Servers

Click Streams

Real-time analytics &

Visualizations

Real-time DataVisualization

Page 7: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Apache Apex and DataTorrent Product Stack

Solutions for Business Problems

Ingestion & Data Prep ETL Pipelines

Ease of Use Tools Real-Time Data VisualizationManagement & MonitoringGUI Application

Assembly

Application Templates

Apex-Malhar Operator Library

Big Data Infrastructure Hadoop 2.x – YARN + HDFS – On Premise & Cloud

Core

High-level APITransformation ML & Score SQL Analytic

s

FileSync

Dev Framework

Batch Support

Apache Apex Core

Kafka HDFS

HDFS HDFS JDBC HDFS JDBC

Kafka

Page 8: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Application Development Model

▪A Stream is a sequence of data tuples▪A typical Operator takes one or more input streams, performs computations & emits one or more output streams

• Each Operator is YOUR custom business logic in java, or built-in operator from our open source library

• Operator has many instances that run in parallel and each instance is single-threaded▪Directed Acyclic Graph (DAG) is made up of operators and streams

Directed Acyclic Graph (DAG)

Filtered

Stream

Output StreamTuple Tuple

Filtered Stream

Enriched Stream

Enriched

Stream

er

Operator

er

Operator

er

Operator

er

Operator

er

Operator

er

Operator

Page 9: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Native Hadoop Integration

• YARN is the resource manager

• HDFS for storing persistent state

• StrAM => Application Master for Apex

Page 10: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Apache Apex - Fault Tolerance•Machine failures are common in a distributed system.

•Process the entire stream again?

•Apex guarantees no loss of data and computational state

•Checkpointing•Buffer Server

Page 11: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Operator State, Checkpointing•Operator state can change as tuples pass through it, for eg: CountOperator may maintain a totalCount variable whose value will change continuously

•Apex periodically checkpoints operator state to HDFS to recover the operator in case of failures.

•Platform decides the windowId to which an operator should be recovered based on application state.

•State of StrAM is also checkpointed

•Data output by an operator is not checkpointed

Page 12: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Fault Tolerance - Checkpointing

O1 O2 O4 O5O3

● Assume checkpoint done every 20 windows

<currentWindowId> | <lastCheckpointWindowId>

125 | 120 110 | 100 105 | 100 92 | 80 77 | 60

Page 13: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Fault Tolerance - Checkpointing

O1 O2 O4 O5O3

125 | 120 110 | 100 105 | 100 92 | 80 77 | 60

● O3 gets KILLED

● O4 and O5 (downstream operators) are redeployed

● O3, O4 and O5 will restart from window ID 61

Page 14: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Fault Tolerance - Checkpointing

O1 O2 O4 O5O3

125 | 120 110 | 100 61 | 60 61 | 60 61 | 60

How will O3 get input for Window 61 when O2 is at 110?

Page 15: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Fault Tolerance - Buffer Server

O1 O2 O4 O5O3

125 | 120 110 | 100 61 | 60 61 | 60 61 | 60

Buffer Server

● Buffer Server : Pub-Sub message queue (in memory with spooling to disk)

● Resides between each operator pair, at the source (O2 above)● O2 publishes to Buffer Server, O3 subscribes to Buffer Server

Page 16: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Apache Apex - At Least Once•Inherently, Apex supports at least once

•Each input tuple will flow from start to end of the application DAG at least once.

•Possible Duplicates

•No missing data

•Operator brought back to its latest checkpointed state and the upstream buffer server replays all subsequent windows

Page 17: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Apache Apex - Idempotency•Replayed windows after a failure should contain same data as before failure

•With every window, meta information is stored which helps in identifying what data was sent in this window

Page 18: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Apache Apex - Idempotency

Max Console

Maximum till nowNumbers

Before failure

Window 81 Window 80

32, 45 21, 25, 12

If Max operator dies at window 81 and restarts at window 81

After recovery, no idempotency

Window 81 Window 80

32 45, 21, 25, 12

After recovery, with idempotency

Window 81 Window 80

32, 45 21, 25, 12

Max till W80 : 25 Max till W80 : 25Max after W81 : 32

Max till W80 : 25Max after W81 : 45

Page 19: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

End to End Exactly Once•No distributed system can support strict exactly once

•Important when dealing with external systems like databases.

•Data should not be duplicated or lost

•Platform support for at least once is a must•End result for exactly once for the application can be achieved via

•At least once•Idempotency•Operator logic - for external operators

Page 20: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Operator Logic - Output Operators•Need to avoid duplicates when writing to external systems eg. Databases

•Operator (in our example) uses transactions.

•Stores window id in a separate table.

•On recovery and replay, it can be used to detect what was already processed and skip instead of writing duplicates.

Page 21: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Operator Logic - Output Operators•Data in a window is written out in a

single transaction•Window id is also written to a meta

table as part of the same transaction•Operator reads the window id from

meta table on recovery•Ignores data for windows less than the

recovered window id and writes new data•Partial window data before failure will

not appear in data table as transaction was not committed•Assumes idempotency for replay

d11 d12 d13

d21 d22 d23

lwn1 lwn2 lwn3

op-id wn

chk wn wn+1

Lwn+11 Lwn+12 Lwn+13

op-id wn+1

Data TableMeta Table

Page 22: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Idempotency for External Operators

Database

Output

Numbers

Before failure

Window 81 Window 80

32, 45 21, 25, 12

If Kafka Input operator dies at window 81 and restarts at window 81

After recovery, no idempotency

Window 81 Window 80

32 45, 21, 25, 12

After recovery, with idempotency

Window 81 Window 80

32, 45 21, 25, 12

Items persisted till W80 : 21, 25, 12

Items persisted till W80 : 21, 25, 12Items persisted till W81 : 21, 25, 12, 32

KafkaInput

Items persisted till W80 : 21, 25, 12Items persisted till W81 : 21, 25, 12, 45, 32

Page 23: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Idempotency for External Operators•Operators interacting with external world have no Buffer Server for idempotency.

•Operators need to implement logic to support idempotency

•Platform provides utility : WindowDataManager

•WindowDataManager : Can be used to replay tuples in the input operator after re-deployment for a window which was not checkpointed but processing was completed before failure.

•For eg: For KafkaInput, we can store start and end offsets for each window

Page 24: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

End to End Exactly Once

AtLeastOnce

Idempotency Operator Logic

End to End Exactly Once in Apex

Page 25: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

End to End Exactly Once•Without At Least Once -- Possible data loss

•Without Idempotency -- Possible data loss, possible wrong output

•Without operator logic -- Possible idempotency loss, Possible duplicate data

Page 26: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Resources• Blog: End to end exactly once

https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/

•App Hub : https://www.datatorrent.com/apphub/

•App templates : https://github.com/DataTorrent/app-templates/

Page 27: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Resources• http://apex.apache.org/• Learn more: http://apex.apache.org/docs.html • Subscribe - http://apex.apache.org/community.html• Download - http://apex.apache.org/downloads.html• Follow @ApacheApex - https://twitter.com/apacheapex• Meetups – http://www.meetup.com/pro/apacheapex/• More examples: https://github.com/DataTorrent/examples• Slideshare:

http://www.slideshare.net/ApacheApex/presentations• https://www.youtube.com/results?search_query=apache+ape

x• Free Enterprise License for Startups -

https://www.datatorrent.com/product/startup-accelerator/

Page 28: End-to-End "Exactly-Once" processing using Apache Apex (Next Gen Hadoop)

Q&AThank you