smart partitioning in apache apex (next gen hadoop)
TRANSCRIPT
Introduction to Apache Apex
Tushar R. GosaviSoftware Engineer &
Apex Committer
2
Stream Processing
•Data from variety of sources (IoT, Kafka, files, social media etc.)•Unbounded, continuous data streams
•Batch can be processed as stream (but a stream is not a batch)
•(In-memory) Processing with temporal boundaries (windows)•Stateful operations: Aggregation, Rules, … -> Analytics•Results stored to variety of sinks or destinations
•Streaming application can also serve data with very low latency
Browser
Web Server
Kafka Input(logs)
Decompress, Parse, Filter
Dimensions Aggregate Kafka
LogsKafka
3
Apache Apex•In-memory, distributed stream processing
• Application logic broken into components called operators that run in a distributed fashion across your cluster
•Natural programming model• Unobtrusive Java API to express (custom) logic• Maintain state and metrics in your member variables
•Scalable, high throughput, low latency• Operators can be scaled up or down at runtime according to the load and SLA• Dynamic scaling (elasticity), compute locality
•Fault tolerance & correctness• Automatically recover from node outages without having to reprocess from beginning• State is preserved, checkpointing, incremental recovery• End-to-end exactly-once
•Operability• System and application metrics, record/visualize data• Dynamic changes
Apache Apex and DataTorrent Product StackDesigned to help you at every stage of your data-in-motion pipeline
Solutions for Business Problems
Ingestion & Data Prep ETL Pipelines
Ease of Use Tools Real-Time Data VisualizationManagement & MonitoringGUI Application
Assembly
Application Templates
Apex-Malhar Operator Library
Big Data Infrastructure Hadoop 2.x – YARN + HDFS – On Prem & Cloud
Core
High-level APITransformation ML & Score SQL Analytic
s
FileSync
Dev Framework
Batch Support
Apache Apex Core
Kafka HDFS
HDFS HDFS JDBC HDFS JDBC
Kafka
Apex Malhar - Production Ready Operators
RDBMS• Vertica• MySQL• Oracle• JDBC
NoSQL• Cassandra• Hbase• MongoDB• Aerospike, Accumulo• Couchbase/ CouchDB• Redis
Messaging• Kafka• Solace• Flume• ActiveMQ• Kinesis
File Systems• HDFS/ Hive• NFS• S3
Parsers• XML • JSON• CSV• Avro• Parquet
Transformations• Filters• Rules• Expression• Dedup• Enrich
Analytics• Dimensional Aggregations
Protocols• HTTP• FTP• WebSocket• MQTT• SMTP
Other• Elastic Search• Script (JavaScript, Python,
R)• Geode• Nifi• Twitter
Operator Maturity Framework
Scalability and Idempotency
Documentation
Example Applications
Internal Certification
Benchmarking
6
Application Development Model
▪A Stream is a sequence of data tuples▪A typical Operator takes one or more input streams, performs computations & emits one or more output streams
• Each Operator is YOUR custom business logic in java, or built-in operator from our open source library
• Operator has many instances that run in parallel and each instance is single-threaded▪Directed Acyclic Graph (DAG) is made up of operators and streams
Directed Acyclic Graph (DAG)
Filtered
Stream
Output StreamTuple Tuple
Filtered Stream
Enriched Stream
Enriched
Stream
er
Operator
er
Operator
er
Operator
er
Operator
er
Operator
er
Operator
7
Native Hadoop Integration
• YARN is the resource manager
• HDFS for storing persistent state
8
Fault Tolerance & Checkpointing
▪ Application window ▪ Sliding window and tumbling window▪ Checkpoint window▪ No artificial latency
Fault Tolerance
9
• Operator state is check-pointed to a persistent storeᵒ Automatically performed by engine, no additional work needed by
operatorᵒ In case of failure operators are restarted from checkpoint stateᵒ Frequency configurable per operatorᵒ Asynchronous and distributed by defaultᵒ Default store is HDFS
• Automatic detection and recovery of failed operatorsᵒ Heartbeat mechanism
• Buffering mechanism to ensure replay of data from recovered point so that there is no loss of data
• Application master state check-pointed
• In-memory PubSub• Stores results emitted by operator until committed• Handles backpressure / spillover to local disk• Ordering, idempotency
Operator 1
Container 1
BufferServer
Node 1
Operator 2
Container 2
Node 2
Buffer Server
10
11
End-to-End Exactly Once
•Important when writing to external systems•Data should not be duplicated or lost in the external system in case of application failures•Common external systems
•Databases•Files•Message queues
•Exactly-once = at-least-once + idempotency + consistent state•Data duplication must be avoided when data is replayed from checkpoint
•Operators implement the logic dependent on the external system•Platform provides checkpointing and repeatable windowing
12
Partitioning (Batch)
13
Partitioning (Streaming)
A B
B
B
B
A
14
How tuples are partitioned• Tuple hashcode and mask used to determine destination partition
ᵒ Mask picks the last n bits of the hashcode of the tupleᵒ hashcode method can be overridden
• StreamCodec can be used to specify custom hashcode for tuplesᵒ Can also be used for specifying custom serialization
tuple: {Name, 24204842, San Jose}
Hashcode: 001010100010101
Mask (0x11)
Partition
00 1
01 2
10 3
11 4
15
Custom distribution of tuples E.g.. Broadcast
tuple:{Name, 24204842, San Jose}
Hashcode: 001010100010101
Mask (0x00) Partition
00 1
00 2
00 3
00 4
16
Why Dynamic Partitioning?
FEB
MA
RA
PR
MA
YJU
NJU
LA
UG
SE
PO
CT
NO
VD
EC
FEB
MA
RA
PR
Data Volume
1 3 5 7 9 11 13 15 17 19 21
Hourly load
Time
Tota
l eve
nts
Input Allocation
17
Dynamic Partitioning
B
B
B
A
B
?
?
?B
B
A
?
18
Partitioning Support in Apex•Customizable partitioning schemes
•User defined data distribution policies (StreamCodec)•Parallel partition.
•Unification of result•User defined unifiers (Unifiers)
•Static & Dynamic Partitioning•User defined hooks to allow state transfer. (Partitioner)•Dynamic partitioning using user defined trigger (StatsListener).
•Separation of processing logic from scaling decisions•Partitioning logic is separated from business logic, set through platform
attributes•Can be set from configuration files
19
PartitioningNxM PartitionsUnifier
0 1 2 3
Logical DAGLogical Diagram
Physical Diagram with operator 1 with 3 partitions
0
Unifier
1a
1b
1c
2a
2b
Unifier 3
Physical DAG with (1a, 1b, 1c) and (2a, 2b): No bottleneck
Unifier
Unifier0
1a
1b
1c
2a
2b
Unifier 3
Physical DAG with (1a, 1b, 1c) and (2a, 2b): Bottleneck on intermediate Unifier
ComputeInput Output
ComputeInput Unifier
Compute
Compute
Output
Set Attribute – PARTITIONER
20
Parallel Partitioning
Physical DAG with Parallel Partition
0 1 2 3 4
Logical DAG
0
3a2a1a
1b U1b 3b2b
U1a
Unifier 4
U2a
U2b
0 4
3a2a1a
1b 2b 3b
Unifier
Physical DAG without Parallel Partition
21
Parallel Partitioning
Physical DAG with Parallel Partition
0 1 2 3 4
Logical DAG
0
3a2a1a
1b U1b 3b2b
U1a
Unifier 4
U2a
U2b
0 4
3a2a1a
1b 2b 3b
Unifier
Physical DAG without Parallel Partition0 1 2 3 4
0 1 2 3 4
0 1 2 3 4
0 1 2 3 4
Physical DAG with Independent Parallel Pipelines
Set Input Port Attribute - PARTITION_PARALLEL
22
Cascading Unifiers
uoprLogical Plan
Execution Plan, for N = 4; M = 1
Container
uopr1
uopr2
uopr3
uopr4
doprunifierNIC
uopr1
uopr2
uopr3
uopr4
doprunifier
unifier
unifier
Container
Container
NIC
NIC
NIC
NIC
NIC
Container
Execution Plan, for N = 4; M = 1, K = 2 with cascading unifiers
Set Unifier Attribute UNIFIER_LIMIT
dopr
23
Dynamic Partitioning
• Partitioning change while application is runningᵒ Change number of partitions at runtime based on statsᵒ Determine initial number of partitions dynamically
• Kafka operators scale according to number of kafka partitionsᵒ Supports re-distribution of state when number of partitions changeᵒ API for custom scaler or partitioner
2b
2c
3
2a
2d
1b
1a1a 2a
1b 2b
3
1a 2b
1b 2c 3b
2a
2d
3a
Unifiers not shown
24
How dynamic partitioning works• Partitioning decision (yes/no) by trigger (StatsListener)
ᵒ Pluggable component, can use any system or custom metricᵒ Externally driven partitioning example: KafkaInputOperator
• Stateful! ᵒ Uses checkpointed stateᵒ Ability to transfer state from old to new partitions (partitioner, customizable)ᵒ Steps:
• Call partitioner• Modify physical plan, rewrite checkpoints as needed• Undeploy old partitions from execution layer• Release/request container resources• Deploy new partitions (from rewritten checkpoint)
ᵒ No loss of data (buffered)ᵒ Incremental operation, partitions that don’t change continue processing
25
Examples
26
Kafka Consumer Partitioning
1 to 1 Partitioning 1 to N Partitioning
27
Kafka 1-1 dynamic partitioning
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Part - 1
Part - 2
Part - 3
28
Kafka 1-1 dynamic partitioning
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Kafka
Input
CSVParser
FilterJSONFormatt
ter
Filtered
Words
Lines
File
Project
Projected
LineWriter
Formatted
Part - 1
Part - 2
Part - 3
Part - 4
29
File Reader Partitioning
File1File2File3File4File5…Filen
File1, File5, …
File2, File6, …
File3, File7, …File4, File8, …
• User can specify number of partitions to start with• Partitions are created accordingly• Files are distributed among the partitions
• User can change the number of partitions at runtime by setting a property
• New partitions will get created automatically• Remaining files will be balanced among the
new partition set
# apexcli> set-operator-property requiredPartitions 10
30
Block Reader Partitioning
File1File2File3File4File5…Filen
BlkF1, BlkF2, …
BlkF1, BlkF2,…
• Users can specify number a minimum and maximum number of partitions.
• New partitions are added on the fly as there are more blocks to read
• Partitions are released when there are there are no pending blocks
reader.setMaxReaders(16)reader.setMinReaders(1)
GE powers Industrial IoT applications with DataTorrent
18 | © 2016 DataTorrent Confidential – Do Not Distribute
GE is dedicated to providing advanced IoT analytics solutions to thousands of customers who are using their devices and sensors across different verticals. GE has built a sophisticated analytics platform, Predix, to help its customers develop and execute Industrial IoT applications and gain real-time insights as well as actions.
Business Need DataTorrent Solution Client Outcome• Ingest and analyze high-volume, high
speed data from thousands of devices, sensors per customer in real-time without data loss
• Predictive analytics to reduce costly maintenance and improve customer service
• Unified monitoring of all connected sensors and devices to minimize disruptions
• Fast application development cycle
• High scalability to meet changing business and application workloads
• Built an ingestion application using DataTorrent Enterprise platform
• Powered by Apache Apex• In-memory stream processing• Built-in fault tolerance• Dynamic scalability• Comprehensive library of pre-
built operators including connectors
• Management UI console
• Helps GE improve performance and lower costs
• Helps GE detect possible failures and minimize unplanned downtimes with easy, centralized management & monitoring of devices
• Enables faster innovation with short application development cycle
• No data loss and 24x7 availability of applications
• Helps GE easily adjust to scalability needs with auto-scaling
CapitalOne prevents fraudulent credit card transactions in real-time with DataTorrent
18 | © 2016 DataTorrent Confidential – Do Not Distribute
Capital One is the eighth-largest bank holding company in the US and specializes in credit cards, home loans, auto loans, banking and savings products. It is using Apache Apex to build their next generation decisioning platform, achieving an ultra-low latency of under 2 ms for decision making, and handling 2,000 events burst at a net rate of 70,000 events/sec.
Business Need DataTorrent Solution• Replaces existing mainframe system
for CapitalOne
• No data loss and 24x7 availability of applications
• Easy integration with the existing infrastructure HDFS, messaging queues & in-memory data grids
• Achieved 0.25 ms latency (on average), 2 ms latency (99.99%), 16 ms (99.9999).
Client Outcome• Decline fraudulent credit card
transactions• Open Source • Decision to authorize or decline a
transaction in real time - SLA is 40 ms (ideally 15 ms)
• Process 2,000 events/ 10ms• Transactions may be declined based
on • a transaction attribute or several
attributes &• dynamically changing rules (e.g.
an account is disabled or an account limit is reached)
• DataTorrent 2.0 Enterprise platform, powered by Apache Apex
• In-memory stream processing • Comprehensive library of pre-
built operators including connectors
• The only enterprise ready solution
• Built-in fault tolerance• Dynamically scalable• Use precomputed ML model for
scoring• Management UI & Data
Visualization console
33
Resources• Apache Apex - http://apex.apache.org/• Subscribe to forums
Apex - http://apex.apache.org/community.html
DataTorrent - https://groups.google.com/forum/#!forum/dt-users• Download - https://datatorrent.com/download/• Twitter
@ApacheApex; Follow - https://twitter.com/apacheapex@DataTorrent; Follow – https://twitter.com/datatorrent
• Meetups - http://meetup.com/topics/apache-apex• Webinars - https://datatorrent.com/webinars/• Videos - https://youtube.com/user/DataTorrent• Slides - http://slideshare.net/DataTorrent/presentations • Startup Accelerator Program - Full featured enterprise product
https://datatorrent.com/product/startup-accelerator/• Big Data Application Templates Hub – https://datatorrent.com/apphub
34
We Are Hiring• [email protected]• Developers/Architects• QA Automation Developers• Information Developers• Build and Release• Community Leaders
© 2015 DataTorrent
Resources
35
Apache Apex Community Page
Apache Apex LinkedIn Group
www.ApexBigData.com
36
Q&AThank you