improvements for large state in apache flink...rocksdb compaction background thread merges sstable...

40
1 Stefan Richter @stefanrrichter April 11, 2017 Improvements for large state in Apache Flink

Upload: others

Post on 21-May-2020

24 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

1

Stefan Richter@stefanrrichter

April 11, 2017

Improvements for large state in Apache Flink

Page 2: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

State in Streaming Programs

2

case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String, count: Long)

env.addSource(…) .map(bytes => Event.parse(bytes) ) .keyBy("producer") .mapWithState { (event: Event, state: Option[Int]) => { // pattern rules } .filter(alert => alert.msg.contains("CRITICAL")) .keyBy("msg") .timeWindow(Time.seconds(10)) .sum("count")

Source map() mapWith State() filter() window()

sum()keyBy keyBy

Page 3: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

State in Streaming Programs

3

case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String, count: Long)

env.addSource(…) .map(bytes => Event.parse(bytes) ) .keyBy("producer") .mapWithState { (event: Event, state: Option[Int]) => { // pattern rules } .filter(alert => alert.msg.contains("CRITICAL")) .keyBy("msg") .timeWindow(Time.seconds(10)) .sum("count")

Source map() mapWith State() filter() window()

sum()keyBy keyBy

Stateless

Stateful

Page 4: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Internal vs External State

4

External State Internal State

• State in a separate data store • Can store "state capacity" independent • Usually much slower than internal state • Hard to get "exactly-once" guarantees

• State in the stream processor • Faster than external state • Working area local to computation • Checkpoints to stable store (DFS) • Always exactly-once consistent • Stream processor has to handle scalability

Page 5: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Keyed State Backends

5

HeapKeyedStateBackend RocksDBKeyedStateBackend-State lives in memory, on Java heap

-Operates on objects

-Think of a hash map {key obj -> state obj}

-Async snapshots supported

-State lives in off-heap memory and on disk

-Operates on bytes, uses serialization

-Think of K/V store {key bytes -> state bytes}

-Log-structured-merge (LSM) tree

-Async snapshots

-Incremental snapshots

Page 6: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Asynchronous Checkpoints

6

Page 7: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Synchronous Checkpointing

7

Checkpoint Coordinator thread

Event processing thread

Checkpointing thread

(loop: processElement)

(trigger checkpoint) (acknowledge checkpoint)

(write state to DFS)Task Manager

Job Manager

Why is async checkpointing so essential for large state?

Page 8: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Synchronous Checkpointing

8

Checkpoint Coordinator thread

Event processing thread

Checkpointing thread

(loop: processElement)

(trigger checkpoint) (acknowledge checkpoint)

(write state to DFS)Task Manager

Job Manager

Problem: All event processing is on hold here to avoid concurrent modifications to the state that is written

Page 9: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Asynchronous Checkpointing

9

Checkpoint Coordinator thread

Event processing thread

Checkpointing thread

(loop: processElement) (snapshot state)

(trigger checkpoint) (acknowledge checkpoint)(write state to DFS)

Task Manager

Job Manager

Page 10: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Asynchronous Checkpointing

10

Checkpoint Coordinator thread

Event processing thread

Checkpointing thread

(loop: processElement) (snapshot state)

(trigger checkpoint) (acknowledge checkpoint)(write state to DFS)

Task Manager

Job Manager

Problem: How to deal with concurrent modifications?

Page 11: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Checkpoints

11

Page 12: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

What we will discuss

▪ What are incremental checkpoints? ▪ Why is RocksDB so well suited for this? ▪ How do we integrate this with Flink’s

checkpointing?

12

Driven by and

Page 13: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Full Checkpointing

13

K S2 B4 W6 N

K S2 B3 K4 L6 N

K S2 Q3 K6 N9 S

K S2 B4 W6 N

K S2 B3 K4 L6 N

K S2 Q3 K6 N9 S

Checkpoint 1 Checkpoint 2 Checkpoint 3

time

Page 14: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Checkpointing

14

K S2 B4 W6 N

K S2 B3 K4 L6 N

K S2 Q3 K6 N9 S

K S2 B4 W6 N

K S3 K4 L

K S2 Q4 -9 S

iCheckpoint 1 iCheckpoint 2 iCheckpoint 3

Δ(-,c1)Δ(c1,c2)

Δ(c2,c3)

time

Page 15: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Recovery

15

K S2 B4 W6 N

K S2 B3 K4 L6 N

K S2 Q3 K6 N9 S

K S2 B4 W6 N

K S3 K4 L

K S2 Q4 -9 S

iCheckpoint 1 iCheckpoint 2 iCheckpoint 3

Δ(-,c1)Δ(c1,c2)

Δ(c2,c3)

time

Recovery?

+ +

Page 16: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

RocksDB Architecture (simplified)

16

Memtable

SSTable-7

SSTable-6

Memory

Storage

key_1 val_1 key_2 val_2 …Index +

sorted by key

- All writes go against Memtable - Mutable Buffer (couple MB) - Unique keys

- Reads consider Memtable first, then SSTables - Immutable - We can consider newly created SSTables as Δs!

periodic flush

periodic merge

Page 17: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

RocksDB Compaction

▪ Background Thread merges SSTable files

▪ Removes copies of the same key (latest version survives)

▪ Actually deletion of keys

17

2 C 7 N 9 Q 1 V 7 - 9 S

1 V 2 C 9 S

SSTable-1 SSTable-2

SSTable-3

merge

Compaction consolidates our Δs!

Page 18: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

18

Checkpoint Coordinator

StatefulMap (1/3)

StatefulMap (2/3)

StatefulMap (3/3)

DFS

Network

SharedStateRegistry

Page 19: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

19

Checkpoint Coordinator

StatefulMap (1/3)

StatefulMap (2/3)

StatefulMap (3/3)

DFS

Network

Step 1: Checkpoint Coordinator sends checkpoint barrier that triggers a snapshot on each instance

SharedStateRegistry

Page 20: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

20

StatefulMap (1/3)

Checkpoint Coordinator

StatefulMap (2/3)

StatefulMap (3/3)

DFS

Δ3Δ2

Δ1

Network

Step 2: Each instance writes its incremental snapshot to distributed storage

SharedStateRegistry

Page 21: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

21

StatefulMap (1/3)

Checkpoint Coordinator

StatefulMap (2/3)

StatefulMap (3/3)

DFS

Δ3Δ2

Δ1

Network

Step 2: Each instance writes its incremental snapshot to distributed storage

SharedStateRegistry

Page 22: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

22

data

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst

shareLocal FS

01010101 00110011 10101010 11001100 01010101

00226.sst

SharedState Registry

Distributed FS://sfmap/1/

Page 23: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

23

data

chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst 00226.sst

01010101 00110011 10101010 11001100 01010101++

shareLocal FS

01010101 00110011 10101010 11001100 01010101

00226.sst

SharedState Registry

copy / hardlink

Distributed FS://sfmap/1/

Page 24: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

24

data

chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst 00226.sst

01010101 00110011 10101010 11001100 01010101++

share01010101 00110011 10101010 11001100 01010101

00226.sst

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest sst.list

Local FS

SharedState Registry

01010101 00110011 10101010 11001100 01010101

00226.sst

chk-1

Distributed FS://sfmap/1/

List of SSTables referenced by snapshot

async upload to DFS

Page 25: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

25

StatefulMap (1/3)

Checkpoint Coordinator

StatefulMap (2/3)

StatefulMap (3/3)

DFS

H1

H2

H3Network

Step 3: Each instance acknowledges and sends a handle (e.g. file path in DFS) to the Checkpoint Coordinator.

SharedStateRegistry

Δ3Δ2

Δ1

Page 26: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

26

data

chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest

01010101 00110011 10101010 11001100 01010101

00225.sst 00226.sst

01010101 00110011 10101010 11001100 01010101++

share01010101 00110011 10101010 11001100 01010101

00226.sst

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest sst.list

Local FS

SharedState Registry

01010101 00110011 10101010 11001100 01010101

00226.sst

chk-1

{00225.sst = 1} {00226.sst = 1}

Distributed FS://sfmap/1/

Page 27: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

27

StatefulMap (1/3)

Checkpoint Coordinator

StatefulMap (2/3)

StatefulMap (3/3)

DFS

H1

H3

CP 1

H2

Network

SharedStateRegistry

Δ3Δ2

Δ1Step 4: Checkpoint Coordinator signals CP1 success to all instances.

Page 28: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Flink’s Incremental Checkpointing

28

Checkpoint Coordinator

StatefulMap (1/3)

StatefulMap (2/3)

StatefulMap (3/3)

DFS

Network

SharedStateRegistry

H1

H3

CP 1

H2

What happens when another CP is triggered? Δ3Δ2

Δ1

Page 29: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

29

data

chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

share01010101 00110011 10101010 11001100 01010101

00226.sst

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest sst.list

Local FS

{00225.sst = 1} {00226.sst = 1}SharedState

Registry

Distributed FS://sfmap/1/

Page 30: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

30

data

chk-2 chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101+ +

share01010101 00110011 10101010 11001100 01010101

00226.sst

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest sst.list

Local FS

{00225.sst = 1} {00226.sst = 1}SharedState

Registry

Distributed FS://sfmap/1/

00229.sst

Page 31: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental Snapshot of Operator

31

data

chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

share01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest sst.list

chk-2

manifest sst.list

Local FS

{00225.sst = 1} {00226.sst = 2} {00228.sst = 1} {00229.sst = 1}

SharedState Registry

Distributed FS://sfmap/1/

chk-2

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101+ +

upload missing SSTable files

Page 32: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Deleting Incremental Checkpoints

32

StatefulMap (1/3)

Checkpoint Coordinator

StatefulMap (2/3)

StatefulMap (3/3)

DFS

H1

H3

CP 1

H1

H2H3

CP 2

H2

Network

Deleting an outdated checkpoint

SharedStateRegistry

Δ3Δ2

Δ1 Δ3Δ2

Δ1

Page 33: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Deleting Incremental Snapshot

33

data

chk-1

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

share01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

00225.sst

manifest sst.list

chk-2

manifest sst.list

Local FS

{00225.sst = 1} {00226.sst = 2} {00228.sst = 1} {00229.sst = 1}

SharedState Registry

Distributed FS://sfmap/1/

Page 34: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Deleting Incremental Snapshot

34

data

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

share01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

00225.sst

chk-2

manifest sst.list

Local FS

{00225.sst = 0} {00226.sst = 1} {00228.sst = 1} {00229.sst = 1}

SharedState Registry

Distributed FS://sfmap/1/

Page 35: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Deleting Incremental Snapshot

35

data

manifest

01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

share01010101 00110011 10101010 11001100 01010101

00226.sst 00228.sst 00229.sst

01010101 00110011 10101010 11001100 01010101

01010101 00110011 10101010 11001100 01010101

chk-2

manifest sst.list

Local FS

{00226.sst = 1} {00228.sst = 1} {00229.sst = 1}

SharedState Registry

Distributed FS://sfmap/1/

Page 36: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Wrapping up

36

Page 37: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental checkpointing benefits

▪ Incremental checkpoints can dramatically reduce CP overhead for large state.

▪ Incremental checkpoints are async. ▪ RocksDB’s compaction consolidates the

increments. Keeps overhead low for recovery.

37

Page 38: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Incremental checkpointing limitations

▪ Breaks the unification of checkpoints and savepoints (CP: low overhead, SP: features)

▪ RocksDB specific format. ▪ Currently no support for rescaling from

incremental checkpoint.

38

Page 39: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Further improvements in Flink 1.3/4

▪ AsyncHeapKeyedStateBackend (merged) ▪ AsyncHeapOperatorStateBackend (PR) ▪ MapState (merged) ▪ RocksDBInternalTimerService (PR) ▪ AsyncHeapInternalTimerService

39

Page 40: Improvements for large state in Apache Flink...RocksDB Compaction Background Thread merges SSTable files Removes copies of the same key (latest version survives) Actually deletion

Questions?

40