smart partitioning with apache apex (webinar)
TRANSCRIPT
Smart Partitioning with Apache Apex
Pramod Immaneni, Architect, PMC memberThomas Weise, Architect & Co-founder, PMC member
May 19th 2016
Stream Processing• Data from a variety of sources (IoT, Kafka, files, social media etc.)• Unbounded stream data
ᵒ Batch can be processed as stream (but a stream is not a batch)• (In-memory) Processing with temporal boundaries (windows)• Stateful operations: Aggregation, Rules, … -> Analytics• Results stored to a variety of sinks or destinations
ᵒ Streaming application can also serve data with very low latency
2
Browser
Web Server
Kafka Input(logs)
Decompress, Parse, Filter
Dimensions Aggregate Kafka
LogsKafka
Apache Apex Features• In-memory stream processing platform
ᵒ Developed since 2012, ASF TLP since 04/2016• Unobtrusive Java API to express (custom) logic• Scale out, distributed, parallel• High throughput & low latency processing• Windowing (temporal boundary)• Reliability, fault tolerance, operability• Hadoop native• Compute locality, affinity• Dynamic updates, elasticity
3
Big data processing & partitioning• Large amount of data to process • Data could be streaming in at high velocity• Pipelining and partitioning to solve the problem• Partitioning
ᵒ Run same logic in multiple processes or threadsᵒ Each partition processes a subset of the data
• Apex supports partitioning out of the boxᵒ Different partitioning schemesᵒ Unificationᵒ Static & Dynamic Partitioningᵒ Separation of processing logic from scaling decisions
4
5
Apex Platform Overview
6
Native Hadoop Integration
• YARN is the resource manager
• HDFS used for storing any persistent state
7
Application Development Model
A Stream is a sequence of data tuplesA typical Operator takes one or more input streams, performs computations & emits one or more output streams
• Each Operator is YOUR custom business logic in java, or built-in operator from our open source library• Operator has many instances that run in parallel and each instance is single-threaded
Directed Acyclic Graph (DAG) is made up of operators and streams
Directed Acyclic Graph (DAG)
Filtered
Stream
Output StreamTuple Tuple
Filtered Stream
Enriched Stream
Enriched
Stream
er
Operator
er
Operator
er
Operator
er
Operator
er
Operator
er
Operator
8
Streaming Windows
Application window Sliding window and tumbling window
Checkpoint window No artificial latency
9
PartitioningNxM PartitionsUnifier
0 1 2 3
Logical DAG
0 1 2
1
1 Unifier
1
20
Logical Diagram
Physical Diagram with operator 1 with 3 partitions
0
Unifier
1a
1b
1c
2a
2b
Unifier 3
Physical DAG with (1a, 1b, 1c) and (2a, 2b): No bottleneck
Unifier
Unifier0
1a
1b
1c
2a
2b
Unifier 3
Physical DAG with (1a, 1b, 1c) and (2a, 2b): Bottleneck on intermediate Unifier
10
Advanced Partitioning
0
1a
1b
2 3 4Unifier
Physical DAG
0 4
3a2a1a
1b 2b 3b
Unifier
Physical DAG with Parallel Partition
Parallel Partition
Container
uopr
uopr1
uopr2
uopr3
uopr4
uopr1
uopr2
uopr3
uopr4
dopr
dopr
doprunifier
unifier
unifier
unifier
Container
Container
NIC
NIC
NIC
NIC
NIC
Container
NIC
Logical Plan
Execution Plan, for N = 4; M = 1
Execution Plan, for N = 4; M = 1, K = 2 with cascading unifiers
Cascading Unifiers
0 1 2 3 4
Logical DAG
11
Dynamic Partitioning
• Partitioning change while application is runningᵒ Change number of partitions at runtime based on statsᵒ Determine initial number of partitions dynamically
• Kafka operators scale according to number of kafka partitionsᵒ Supports re-distribution of state when number of partitions changeᵒ API for custom scaler or partitioner
2b
2c
3
2a
2d
1b
1a1a 2a
1b 2b
3
1a 2b
1b 2c 3b
2a
2d
3a
Unifiers not shown
Kafka Consumer Partitioning
12
1 to 1 Partitioning 1 to N Partitioning
13
File Reader Partitioning
File1File2File3File4File5…Filen
File1, File5, …
File2, File6, …
File3, File7, …File4, File8, …
• User can specify number of partitions to start with
• Partitions are created accordingly• Files are distributed among the
partitions• User can change the number of partitions
at runtime by setting a property• New partitions will get created
automatically• Remaining files will be balanced
among the new partition setc
14
Block Reader Partitioning
File1File2File3File4File5…Filen
BlkF1, BlkF2, …
BlkF1, BlkF2,…
• Users can specify number a minimum and maximum number of partitions.
• New partitions are added on the fly as there are more blocks to read
• Partitions are released when there are there are no pending blocks
15
Throughput Partitioner• Scale based on thresholds for processed events• Demo: Location tracking, scale with number of active devices
16
How dynamic partitioning works• Partitioning decision (yes/no) by trigger (StatsListener)
ᵒ Pluggable component, can use any system or custom metricᵒ Externally driven partitioning example: KafkaInputOperator
• Stateful! ᵒ Uses checkpointed stateᵒ Ability to transfer state from old to new partitions (partitioner, customizable)ᵒ Steps:
• Call partitioner• Modify physical plan, rewrite checkpoints as needed• Undeploy old partitions from execution layer• Release/request container resources• Deploy new partitions (from rewritten checkpoint)
ᵒ No loss of data (buffered)ᵒ Incremental operation, partitions that don’t change continue processing
• API: Partitioner interface
17
Writing custom partitioner• Partitioner accepts set of partitions as of last checkpoint (frozen state) and
returns set of new partitions• Access to operator state• Control partitioning for each input port• Can implement any custom logic to derive new from old partitions• StatsListener
• public Response processStats(BatchedOperatorStats stats)• Throughput, latency, CPU, …
• Partitioner• public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions,
PartitioningContext context)• Examples: https
://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/partitioner
18
How tuples are split between partitions• Tuple hashcode and mask used to determine destination
partitionᵒ Mask picks the last n bits of the hashcode of the tupleᵒ hashcode method can be overridden
• StreamCodec can be used to specify custom hashcode for tuplesᵒ Can also be used for specifying custom serialization
tuple: {Name, 24204842, San Jose}
Hashcode: 001010100010101
Mask (0x11)
Partition
00 1
01 2
10 3
11 4
19
Custom splits• Custom distribution of tuples
ᵒ E.g.. Broadcast
tuple:{Name, 24204842, San Jose}
Hashcode: 001010100010101
Mask (0x00)
Partition
00 1
00 2
00 3
00 4
20
Resources• http://apex.apache.org/• Learn more: http://apex.apache.org/docs.html • Subscribe - http://apex.apache.org/community.html• Download - http://apex.apache.org/downloads.html• Follow @ApacheApex - https://twitter.com/apacheapex• Meetups – http://www.meetup.com/pro/apacheapex/• More examples: https://github.com/DataTorrent/examples• Slideshare:
http://www.slideshare.net/ApacheApex/presentations• https://www.youtube.com/results?search_query=apache+ape
x• Free Enterprise License for Startups -
https://www.datatorrent.com/product/startup-accelerator/
Q&A
21