improvements for large state in apache flink...rocksdb compaction background thread merges sstable...
TRANSCRIPT
1
Stefan Richter@stefanrrichter
April 11, 2017
Improvements for large state in Apache Flink
State in Streaming Programs
2
case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String, count: Long)
env.addSource(…) .map(bytes => Event.parse(bytes) ) .keyBy("producer") .mapWithState { (event: Event, state: Option[Int]) => { // pattern rules } .filter(alert => alert.msg.contains("CRITICAL")) .keyBy("msg") .timeWindow(Time.seconds(10)) .sum("count")
Source map() mapWith State() filter() window()
sum()keyBy keyBy
State in Streaming Programs
3
case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String, count: Long)
env.addSource(…) .map(bytes => Event.parse(bytes) ) .keyBy("producer") .mapWithState { (event: Event, state: Option[Int]) => { // pattern rules } .filter(alert => alert.msg.contains("CRITICAL")) .keyBy("msg") .timeWindow(Time.seconds(10)) .sum("count")
Source map() mapWith State() filter() window()
sum()keyBy keyBy
Stateless
Stateful
Internal vs External State
4
External State Internal State
• State in a separate data store • Can store "state capacity" independent • Usually much slower than internal state • Hard to get "exactly-once" guarantees
• State in the stream processor • Faster than external state • Working area local to computation • Checkpoints to stable store (DFS) • Always exactly-once consistent • Stream processor has to handle scalability
Keyed State Backends
5
HeapKeyedStateBackend RocksDBKeyedStateBackend-State lives in memory, on Java heap
-Operates on objects
-Think of a hash map {key obj -> state obj}
-Async snapshots supported
-State lives in off-heap memory and on disk
-Operates on bytes, uses serialization
-Think of K/V store {key bytes -> state bytes}
-Log-structured-merge (LSM) tree
-Async snapshots
-Incremental snapshots
Asynchronous Checkpoints
6
Synchronous Checkpointing
7
Checkpoint Coordinator thread
Event processing thread
Checkpointing thread
(loop: processElement)
(trigger checkpoint) (acknowledge checkpoint)
(write state to DFS)Task Manager
Job Manager
Why is async checkpointing so essential for large state?
Synchronous Checkpointing
8
Checkpoint Coordinator thread
Event processing thread
Checkpointing thread
(loop: processElement)
(trigger checkpoint) (acknowledge checkpoint)
(write state to DFS)Task Manager
Job Manager
Problem: All event processing is on hold here to avoid concurrent modifications to the state that is written
Asynchronous Checkpointing
9
Checkpoint Coordinator thread
Event processing thread
Checkpointing thread
(loop: processElement) (snapshot state)
(trigger checkpoint) (acknowledge checkpoint)(write state to DFS)
Task Manager
Job Manager
Asynchronous Checkpointing
10
Checkpoint Coordinator thread
Event processing thread
Checkpointing thread
(loop: processElement) (snapshot state)
(trigger checkpoint) (acknowledge checkpoint)(write state to DFS)
Task Manager
Job Manager
Problem: How to deal with concurrent modifications?
Incremental Checkpoints
11
What we will discuss
▪ What are incremental checkpoints? ▪ Why is RocksDB so well suited for this? ▪ How do we integrate this with Flink’s
checkpointing?
12
Driven by and
Full Checkpointing
13
K S2 B4 W6 N
K S2 B3 K4 L6 N
K S2 Q3 K6 N9 S
K S2 B4 W6 N
K S2 B3 K4 L6 N
K S2 Q3 K6 N9 S
Checkpoint 1 Checkpoint 2 Checkpoint 3
time
Incremental Checkpointing
14
K S2 B4 W6 N
K S2 B3 K4 L6 N
K S2 Q3 K6 N9 S
K S2 B4 W6 N
K S3 K4 L
K S2 Q4 -9 S
iCheckpoint 1 iCheckpoint 2 iCheckpoint 3
Δ(-,c1)Δ(c1,c2)
Δ(c2,c3)
time
Incremental Recovery
15
K S2 B4 W6 N
K S2 B3 K4 L6 N
K S2 Q3 K6 N9 S
K S2 B4 W6 N
K S3 K4 L
K S2 Q4 -9 S
iCheckpoint 1 iCheckpoint 2 iCheckpoint 3
Δ(-,c1)Δ(c1,c2)
Δ(c2,c3)
time
Recovery?
+ +
RocksDB Architecture (simplified)
16
Memtable
SSTable-7
SSTable-6
Memory
…
Storage
key_1 val_1 key_2 val_2 …Index +
sorted by key
- All writes go against Memtable - Mutable Buffer (couple MB) - Unique keys
- Reads consider Memtable first, then SSTables - Immutable - We can consider newly created SSTables as Δs!
periodic flush
periodic merge
RocksDB Compaction
▪ Background Thread merges SSTable files
▪ Removes copies of the same key (latest version survives)
▪ Actually deletion of keys
17
2 C 7 N 9 Q 1 V 7 - 9 S
1 V 2 C 9 S
SSTable-1 SSTable-2
SSTable-3
merge
Compaction consolidates our Δs!
Flink’s Incremental Checkpointing
18
Checkpoint Coordinator
StatefulMap (1/3)
StatefulMap (2/3)
StatefulMap (3/3)
DFS
Network
SharedStateRegistry
Flink’s Incremental Checkpointing
19
Checkpoint Coordinator
StatefulMap (1/3)
StatefulMap (2/3)
StatefulMap (3/3)
DFS
Network
Step 1: Checkpoint Coordinator sends checkpoint barrier that triggers a snapshot on each instance
SharedStateRegistry
Flink’s Incremental Checkpointing
20
StatefulMap (1/3)
Checkpoint Coordinator
StatefulMap (2/3)
StatefulMap (3/3)
DFS
Δ3Δ2
Δ1
Network
Step 2: Each instance writes its incremental snapshot to distributed storage
SharedStateRegistry
Flink’s Incremental Checkpointing
21
StatefulMap (1/3)
Checkpoint Coordinator
StatefulMap (2/3)
StatefulMap (3/3)
DFS
Δ3Δ2
Δ1
Network
Step 2: Each instance writes its incremental snapshot to distributed storage
SharedStateRegistry
Incremental Snapshot of Operator
22
data
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst
shareLocal FS
01010101 00110011 10101010 11001100 01010101
00226.sst
SharedState Registry
Distributed FS://sfmap/1/
Incremental Snapshot of Operator
23
data
chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst 00226.sst
01010101 00110011 10101010 11001100 01010101++
shareLocal FS
01010101 00110011 10101010 11001100 01010101
00226.sst
SharedState Registry
copy / hardlink
Distributed FS://sfmap/1/
Incremental Snapshot of Operator
24
data
chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst 00226.sst
01010101 00110011 10101010 11001100 01010101++
share01010101 00110011 10101010 11001100 01010101
00226.sst
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest sst.list
Local FS
SharedState Registry
01010101 00110011 10101010 11001100 01010101
00226.sst
chk-1
Distributed FS://sfmap/1/
List of SSTables referenced by snapshot
async upload to DFS
Flink’s Incremental Checkpointing
25
StatefulMap (1/3)
Checkpoint Coordinator
StatefulMap (2/3)
StatefulMap (3/3)
DFS
H1
H2
H3Network
Step 3: Each instance acknowledges and sends a handle (e.g. file path in DFS) to the Checkpoint Coordinator.
SharedStateRegistry
Δ3Δ2
Δ1
Incremental Snapshot of Operator
26
data
chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest
01010101 00110011 10101010 11001100 01010101
00225.sst 00226.sst
01010101 00110011 10101010 11001100 01010101++
share01010101 00110011 10101010 11001100 01010101
00226.sst
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest sst.list
Local FS
SharedState Registry
01010101 00110011 10101010 11001100 01010101
00226.sst
chk-1
{00225.sst = 1} {00226.sst = 1}
Distributed FS://sfmap/1/
Flink’s Incremental Checkpointing
27
StatefulMap (1/3)
Checkpoint Coordinator
StatefulMap (2/3)
StatefulMap (3/3)
DFS
H1
H3
CP 1
H2
Network
SharedStateRegistry
Δ3Δ2
Δ1Step 4: Checkpoint Coordinator signals CP1 success to all instances.
Flink’s Incremental Checkpointing
28
Checkpoint Coordinator
StatefulMap (1/3)
StatefulMap (2/3)
StatefulMap (3/3)
DFS
Network
SharedStateRegistry
H1
H3
CP 1
H2
What happens when another CP is triggered? Δ3Δ2
Δ1
Incremental Snapshot of Operator
29
data
chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
share01010101 00110011 10101010 11001100 01010101
00226.sst
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest sst.list
Local FS
{00225.sst = 1} {00226.sst = 1}SharedState
Registry
Distributed FS://sfmap/1/
Incremental Snapshot of Operator
30
data
chk-2 chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101+ +
share01010101 00110011 10101010 11001100 01010101
00226.sst
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest sst.list
Local FS
{00225.sst = 1} {00226.sst = 1}SharedState
Registry
Distributed FS://sfmap/1/
00229.sst
Incremental Snapshot of Operator
31
data
chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
share01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest sst.list
chk-2
manifest sst.list
Local FS
{00225.sst = 1} {00226.sst = 2} {00228.sst = 1} {00229.sst = 1}
SharedState Registry
Distributed FS://sfmap/1/
chk-2
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101+ +
upload missing SSTable files
Deleting Incremental Checkpoints
32
StatefulMap (1/3)
Checkpoint Coordinator
StatefulMap (2/3)
StatefulMap (3/3)
DFS
H1
H3
CP 1
H1
H2H3
CP 2
H2
Network
Deleting an outdated checkpoint
SharedStateRegistry
Δ3Δ2
Δ1 Δ3Δ2
Δ1
Deleting Incremental Snapshot
33
data
chk-1
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
share01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
00225.sst
manifest sst.list
chk-2
manifest sst.list
Local FS
{00225.sst = 1} {00226.sst = 2} {00228.sst = 1} {00229.sst = 1}
SharedState Registry
Distributed FS://sfmap/1/
Deleting Incremental Snapshot
34
data
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
share01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
00225.sst
chk-2
manifest sst.list
Local FS
{00225.sst = 0} {00226.sst = 1} {00228.sst = 1} {00229.sst = 1}
SharedState Registry
Distributed FS://sfmap/1/
Deleting Incremental Snapshot
35
data
manifest
01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
share01010101 00110011 10101010 11001100 01010101
00226.sst 00228.sst 00229.sst
01010101 00110011 10101010 11001100 01010101
01010101 00110011 10101010 11001100 01010101
chk-2
manifest sst.list
Local FS
{00226.sst = 1} {00228.sst = 1} {00229.sst = 1}
SharedState Registry
Distributed FS://sfmap/1/
Wrapping up
36
Incremental checkpointing benefits
▪ Incremental checkpoints can dramatically reduce CP overhead for large state.
▪ Incremental checkpoints are async. ▪ RocksDB’s compaction consolidates the
increments. Keeps overhead low for recovery.
37
Incremental checkpointing limitations
▪ Breaks the unification of checkpoints and savepoints (CP: low overhead, SP: features)
▪ RocksDB specific format. ▪ Currently no support for rescaling from
incremental checkpoint.
38
Further improvements in Flink 1.3/4
▪ AsyncHeapKeyedStateBackend (merged) ▪ AsyncHeapOperatorStateBackend (PR) ▪ MapState (merged) ▪ RocksDBInternalTimerService (PR) ▪ AsyncHeapInternalTimerService
39
Questions?
40