distributed algorithms for big data @ geecon
TRANSCRIPT
DISTRIBUTED ALGORITHMS FOR BIG DATA @doanduyhai Cassandra Technical Advocate Datastax
@doanduyhai 1
Who Am I ? Duy Hai DOAN Cassandra technical advocate • talks, meetups, confs • open-source devs (Achilles, …) • OSS Cassandra point of contact
☞ [email protected] ☞ @doanduyhai
@doanduyhai 2
Agenda High cardinality estimate with HyperLogLog Distributed consensus with Paxos
@doanduyhai 3
HyperLogLog
@doanduyhai 4
Philippe Flajolet 2007
@doanduyhai 5
The challenge Count the number of distinct elements, concurrently, in a high cardinality data set
@doanduyhai 6
The challenge Count the number of distinct elements, concurrently, in a high cardinality data set
@doanduyhai 7
The challenge Count the number of distinct elements, concurrently, in a high cardinality data set
Some possible solutions
@doanduyhai 8
Data structure Required space Estimated cardinality Error margin
Java HashSet 10 447 016 (10M) 67 801 0%
Linear Probabilistic Counter 3 384 (3k) 67 080 1%
HyperLogLog 512 70 002 3%
Credits: http://highscalability.com/
@doanduyhai 9
Let’s play some game
Rolling dice (attempt 1)
@doanduyhai 10
0 2 4 6 8
10 12 14 16 18 20
1 2 3 4 5 6
100 rounds
Rolling dice (attempt 2)
@doanduyhai 11
0
50
100
150
200
1 2 3 4 5 6
103 rounds
Rolling dice (attempt 3)
@doanduyhai 12
0
20000
40000
60000
80000
100000
120000
140000
160000
180000
1 2 3 4 5 6
106 rounds
@doanduyhai 13
LogLog Algorithm Before HyperLogLog,
there was … LogLog
LogLog algorithm(simplified)
1) Choose a very distributive hash function H
2) For each incoming element in the data set (article_id, login, uuid…), apply H
3) Convert the hash into binary sequence
4) estimate the cardinality by observing the binary sequences
@doanduyhai 14
0111010010101… 0010010010001… 1010111001100…
…
LogLog intuition Uniform probability: 50% of the bit sequences start with 0xxxxx
50% of the bit sequences start with 1xxxxx
1/4 of the bit sequences start with 00xxxxx
1/4 of the bit sequences start with 01xxxxx
1/4 of the bit sequences start with 10xxxxx
1/4 of the bit sequences start with 11xxxxx
@doanduyhai 15
LogLog intuition
Look for the position r of the 1st bit set to 1 starting from the left
000000001xxxx à r = 9 0001xxxxxxxxx à r = 4 000001xxxxxxx à r = 6
@doanduyhai 16
000000…0001xxxxxxx rank r
LogLog intuition
There are 2r combinations of r - length bit sequences
000…0001, 000…0010, 000…0011,…, 111…1111
@doanduyhai 17
000000…0001xxxxxxx rank r
LogLog intuition Uniform probability: 1/2r of the bit sequences start with 000000…0001xxx 1/2r of the bit sequences start with 000000…0010xxx
…
1/2r of the bit sequences start with 111111…1111xxx
@doanduyhai 18
@doanduyhai 19
Reversing the logic
@doanduyhai 20
Reversing the logic I have as many chance
to observe 000000…0001xxx than to observe 000000…0010xxx
than to observe 000000…0011xxx etc…
@doanduyhai 21
Reversing the logic If I have observed 000000…0001xxx
I should probably observe 000000…0010xxx
and probably observe 000000…0011xxx etc…
@doanduyhai 22
Reversing the logic If I have observed 000000…0001xxx …
there is probably 2r bit sequences of rank r …
@doanduyhai 23
Reversing the logic If I have observed 000000…0001xxx …
there is probably 2r bit sequences of rank r …
estimated cardinality
LogLog formula Let’s look for the longest position 0000…01xxx observed among all binary sequences
carnidality n ≈ 2max(r)
@doanduyhai 24
LogLog pitfall Example: 1000 distinct elements
0010000100xxxxxxxxxx
0011001010xxxxxxxxxx 0000000001xxxxxxxxxx …
000000000000001xxxxx à rank r = 15, n ≈ 215 ≈ 32768 WRONG! … 1100110100xxxxxxxxxx
@doanduyhai 25
LogLog pitfall
@doanduyhai 26
Statistical outliers
HyperLogLog idea 1) Eliminate and smooth out outlying elements ☞ harmonic mean
@doanduyhai 27
H =n
1x1+1x2+...+ 1
xnCredits: Wikipedia
HyperLogLog idea Example, harmonic mean of 3, 6, 7, 2 and 120 Arithmetic mean = 51 …
@doanduyhai 28
H =5
13+16+17+112
+1120
≈ 6.80
HyperLogLog idea 2) Distribute the computation (« divide and conquer ») ☞ apply LogLog to n buckets p = prefix length (here 6) buckets count = 2p (here 64)
@doanduyhai 29
101101000xxxxxxx p bits
HyperLogLog idea 2) Distribute the computation (« divide and conquer »)
@doanduyhai 30
000000xxxx
Input data stream
B1 B2 B3 B4 B63 B64 B62 B61 … …
000001xxxx 000010xxxx 000011xxxx 111100xxxx 111101xxxx 111110xxxx 111111xxxx
101101000001xxxx
HyperLogLog idea 3) Apply LogLog on each bucket
@doanduyhai 31
p = bucket prefix
r = rank for LogLog
HyperLogLog formula For each bucket i, we compute the cardinality estimate for this bucket, Mi
Mi ≈ 2max(ri) max(ri) = max rank found in bucket Mi
@doanduyhai 32
HyperLogLog formula Harmonic mean H(Mi) computed on all Mi, by definition
H(Mi) ≈ n/b
n = global cardinality estimate (what we look for) b = number of buckets
☞ n ≈ b • H(Mi) @doanduyhai 33
HyperLogLog, the maths
@doanduyhai 34
H (xi ) =b
1x1+1x2+...+ 1
xb
= b 11xii=1
b∑
"
#
$$$$
%
&
''''
H (xi ) = b1xi
i=1b
∑"
#$$
%
&''
−1
= b xi−1
i=1
b∑"
#$
%
&'−1
HyperLogLog, the maths We replace the xi in the previous formula by Mi
Then we replace the Mi in the formula by 2max(ri)
@doanduyhai 35
H (Mi ) = b Mi−1
i=1
b∑( )
−1
H (Mi ) = b 2i−max(ri )
i=1
b∑#
$%
&
'(−1
HyperLogLog, the maths Inject H(Mi) into the formula for cardinality estimate: n ≈ b・H(Mi)
@doanduyhai 36
n ≈αbb2 2−max(ri )
i=1
b∑$
%&
'
()−1
n = cardinality estimate b = buckets count
𝛼b = corrective constant
max rank observed in each bucket
HyperLogLog space requirement
@doanduyhai 37
Hash function length Max estimated card. Bucket size (bits)
16 bits 216 = 65536 log2(16) = 4
32 bits 232 ≈ 4.2 109 log2(32) = 5
64 bits 264 ≈ 1.8 1019 log2(64) = 6
max_ estimated _ cardinality = 2hash_ function_ length
bucket _ size = log2(hash_ function_ length)required _ space = bucket _ count *bucket _ size
HyperLogLog accuracy
@doanduyhai 38
Buckets count Required space (bytes) Accuracy
256 16bits: 128, 32bits: 160, 64bits: 192 6.25%
512 16bits: 256, 32bits: 320, 64bits: 384 4.59%
1024 16bits: 512, 32bits: 640, 64bits: 768 3.25%
2048 16bits: 1k, 32bits: 1.25k, 64bits: 1.5k 2.29%
accuracy ≈ 1.04bucket _ count
Which use-cases ? Nb of unique visitors on high traffic web site Nb of unique clicks on popular articles/items TopN elements (visitors, items …) …
@doanduyhai 39
Some real-world implementations Apache Cassandra: distributed table size estimate Redis: out-of-the-box data structure DataFu (Apache Pig): standard UDF Twitter Algebird: algorithms lib for Storm & Scalding
@doanduyhai 40
Paxos
@doanduyhai 41
Leslie LAMPORT 1989
@doanduyhai 42
The challenge Find a consensus, in a distributed system, in the presence of random failures (hardware, network, …)
@doanduyhai 43
The challenge Find a consensus, in a distributed system, in the presence of random failures (hardware, network, …)
@doanduyhai 44
The challenge Find a consensus, in a distributed system, in the presence of random failures (hardware, network, …)
@doanduyhai 45
2 phase commit ? • blocking protocol by nature • requires human intervention if
manager down
@doanduyhai 46
3 phase commit ? • inconsistent state when
split-brain network partition
@doanduyhai 47
Paxos • 2,5 network round-trips • 3 roles
• Proposer • Acceptor • Learner
• needs a quorum of response
Paxos phase 1: prepare n = sequence number
@doanduyhai 48
Proposer
Acceptor
Client Acceptor
Acceptor
Acceptor
Acceptor
prepare(n)
prepare(n)
prepare(n)
Ask for consensus on value val prepare(n)
prepare(n)
Paxos phase 1: promise
@doanduyhai 49
Proposer
Acceptor
Client Acceptor
Acceptor
Acceptor
Acceptor
promise()
promise()
promise()
promise()
promise()
Paxos phase 2: accept
@doanduyhai 50
Proposer
Acceptor
Client Acceptor
Acceptor
Acceptor
Acceptor
accept(n,val)
accept(n,val)
accept(n,val)
val = target consensus value
accept(n,val)
accept(n,val)
Paxos phase 2: accepted
@doanduyhai 51
Proposer
Acceptor
Client Acceptor
Acceptor
Acceptor
Acceptor
accepted(n,val)
accepted(n,val)
accepted(n,val)
val accepted
accepted(n,val)
accepted(n,val)
Paxos phase 2.5: learn
@doanduyhai 52
Proposer
Acceptor
Client Acceptor
Acceptor
Acceptor
Acceptor
store val
learner = durable storage Learner
Learner
Learner
store val
store val
Paxos phase 1: prepare The proposer: • picks an monotonically increasing (timeuuid) sequence number n • sends prepare(n) to all acceptors
@doanduyhai 53
Proposer Acceptor
prepare(n)
Each acceptor, upon receiving a prepare(n): • if it has not accepted(m,?) OR promise(m,valm) with m > n
☞ return promise(n,∅), store n locally ☞ promise not to accept any prepare(o) or accept(o,?) with o < n
Paxos phase 1: promise
@doanduyhai 54
Proposer Acceptor
promise(n,∅) n,∅
Paxos phase 1: promise Each acceptor, upon receiving a prepare(n): • if it has already sent an accepted(m,valm) with m < n
☞ return promise(m,valm)
@doanduyhai 55
Proposer Acceptor
promise(m,valm) m,valm
Paxos phase 1: promise Each acceptor, upon receiving a prepare(n): • if it has accepted(m,?) OR promise(m,?) with m > n
☞ ignore OR return Nack (optimization)
@doanduyhai 56
Proposer Acceptor
Nack
Paxos phase 1 objectives • discover any pending action to make it progress • block old proposal(s) that are stalled
Proposer asks for plebiscit (prepare) Acceptors grant allegiance (promise)
@doanduyhai 57
Proposer Acceptor
Who’s the boss ?
You sir!
Paxos phase 2: accept The proposer receives a quorum of promise(mi,valmi
) • if all promises are promise(n, ∅) then send accept(n,valn) • otherwise, take the valmi
of the biggest mi and send accept(n,valmax(mi)
)
@doanduyhai 58
Proposer Acceptor
accept(n,valmax(mi)) OR
accept(n,valn)
Paxos phase 2: accepted Each acceptor, upon receiving a accept(n,val): • if it has not made any promise(m,?) m > n
☞ return accepted(n,val), store val locally • else, ignore the request
@doanduyhai 59
Proposer Acceptor
accepted(n,val) n,val
Paxos phase 2.5: learn The proposer receives a quorum of accepted(n,val) • send val to the learners (durable storage)
The consensus is found and its value is val
This defines a round of Paxos
@doanduyhai 60
Proposer
store val Learner
Paxos phase 2 objectives • commit any pending proposal • learn the consensus value
Proposer issues a proposal (accept) Acceptors accept the proposal (accepted)
@doanduyhai 61
Proposer Acceptor
Accept this !
Yes sir!
Formal Paxos limits • once a consensus val is reached, we can’t change it! • needs to reset val for another Paxos round Multi-Paxos • many rounds of Paxos in //, impacting different partitions • each server can be Proposer, Acceptor & Learner
Fast-Paxos, Egalitarian-Paxos, etc …
@doanduyhai 62
Conflict cases Failure of a minority of acceptors
@doanduyhai 63
a1
a2
a3
a4
a5
prepare(n1)
prepare(n1)
prepare(n1)
prepare(n1)
prepare(n1)
Legend received message sent message
promise(∅)
promise(∅)
promise(∅)
promise(∅)
promise(∅)
accept(n1,a)
accept(n1,a)
☠ ☠
accepted(a)
accepted(a)
accept(n1,a) accepted(a) ✔ ︎
Conflict cases Stalled Paxos round committed by subsequent rounds
@doanduyhai 64
a1
a2
a3
a4
a5
prepare(n1)
prepare(n1)
prepare(n1)
prepare(n1)
prepare(n1)
Legend received message sent message
promise(∅)
promise(∅)
promise(∅)
promise(∅)
promise(∅)
accept(n1,a)
accept(n1,a)
☠ ☠
accepted(a)
☠ accepted(a)
prepare(n2)
prepare(n2)
prepare(n2)
prepare(n2)
prepare(n2)
promise(∅)
promise(∅)
promise(∅)
promise(n1,a)
promise(n1,a)
accept(n2,a)
accept(n2,a)
accept(n2,a)
accept(n2,a)
accept(n2,a)
✔ ︎
Conflict cases Random failure, last plebiscit wins !
@doanduyhai 65
a1
a2
a3
a4
a5
prepare(n1)
prepare(n1)
prepare(n1)
Legend received message sent message
promise(∅)
promise(∅)
promise(∅) accepted(n2,b) prepare(n2)
prepare(n2)
prepare(n2)
promise(∅)
promise(∅)
promise(∅)
⚡ ️
⚡ ️⚡ ️
⚡ ️accept(n1,a)
Nack accept(n2,b)
accept(n2,b)
accept(n2,b)
accept(n1,a)
accept(n1,a)
accepted(n2,b)
accepted(n2,b)
accept(n1,a)
accept(n1,a)
💡 💡
💡 💡
Nack
Nack
accept(n2,b)
accept(n2,b)
✔ ︎
❌ ︎accepted(n2,b)
accepted(n2,b)
Conflict cases Inter dead-lock
@doanduyhai 66
a1
a2
a3
a4
a5
prepare(n1)
prepare(n1)
prepare(n1)
Legend received message sent message
promise(∅)
promise(∅)
promise(∅) prepare(n2)
prepare(n2)
prepare(n2)
promise(∅)
promise(∅)
promise(∅)
prepare(n1) promise(∅)
prepare(n1) promise(∅)
prepare(n2) promise(∅)
prepare(n2) promise(∅)
accept(n1,a)
accept(n1,a)
accept(n1,a)
accept(n1,a)
accept(n1,a)
Nack
Nack
Nack
Nack
Nack prepare(n3)
prepare(n3)
prepare(n3)
prepare(n3)
prepare(n3)
promise(∅)
promise(∅)
promise(∅)
promise(∅)
promise(∅)
accept(n2,b) Nack
accept(n2,b) Nack
accept(n2,b) Nack
accept(n2,b) Nack
accept(n2,b) Nack
Conflict cases Solve inter dead-lock by random sleep
@doanduyhai 67
a1
a2
a3
a4
a5
prepare(n1)
prepare(n1)
prepare(n1)
Legend received message sent message
promise(∅)
promise(∅)
promise(∅) prepare(n2)
prepare(n2)
prepare(n2)
promise(∅)
promise(∅)
promise(∅)
prepare(n1) promise(∅)
prepare(n1) promise(∅)
prepare(n2) promise(∅)
prepare(n2) promise(∅)
accept(n1,a)
accept(n1,a)
accept(n1,a)
accept(n1,a)
accept(n1,a)
Nack
Nack
Nack
Nack
Nack prepare(n3)
prepare(n3)
prepare(n3)
prepare(n3)
prepare(n3)
promise(∅)
promise(∅)
promise(∅)
promise(∅)
promise(∅)
accept(n2,b) Nack
accept(n2,b) Nack
accept(n2,b) Nack
accept(n2,b) Nack
accept(n2,b) Nack
Which use-cases ? Reliable master election for master/slave architectures Distributed consensus Distributed Compare & Swap algorithm Distributed lock
@doanduyhai 68
Some real-world implementations Apache Cassandra: light weight transaction Google Chubby/Spanner: lock/distributed transactions Heroku: via Doozerd for distributed configuration data Neo4j(≥1.9): replaces Apache Zookeeper for high availablity
@doanduyhai 69
@doanduyhai 70
Cassandra Lightweight Transactions
@doanduyhai 71
Q & R
! " "
@doanduyhai 72
Thank You !