accumulo summit 2014: accismus -- percolating with accumulo
DESCRIPTION
Talk Info --------- Title: Percolating with Accumulo Abstract: A talk about conditional mutations and Accismus (a Percolator prototype) covering the following topics. * Conditional mutation use cases and overview * Conditional muitation implementation * Percolator overview and use cases * Percolator implementationTRANSCRIPT
Accismus
A Percolator implementationusing Accumulo
Keith Turner
Accismus
A form of irony where one pretends indifference and refuses something while actually wanting it.
Google's Problem
● Use M/R to process ~ 1015
bytes
● ~1012
bytes new data arrive
● Use M/R to process 1015
+ 1012
bytes● High latency before new data available for
query
Solution
● Percolator : incremental processing for big data– Layer on top of BigTable
– Offers fault tolerant, cross row transactions● Lazy recovery
– Offers snapshot isolation● Only read committed data
– Uses BigTable data model, except timestamp● Accismus adds visibility
– Has own API
Observers
● User defined function that executes a transaction
● Triggered when a user defined column is modified (called notification in paper)
● Guarantee only one transaction will execute per notification
Initialize bank
tx1.begin()
if(tx1.get('bob','balance') == null)
tx1.set('bob','balance',100)
if(tx1.get('joe','balance') == null)
tx1.set('joe','balance',100)
if(tx1.get('sue','balance') == null)
tx1.set('sue','balance',100)
tx1.commit()
What could possibly go wrong?
Two threads transferring
Thread 2 on node BThread 2 on node B
tx3.begin()
b3 = tx3.get('joe','balance')
b4 = tx3.get('sue','balance')
tx3.set('joe','balance',b3 + 5)
tx3.set('sue','balance',b4 - 5)
tx3.commit()
Thread 1 on node AThread 1 on node A
tx2.begin()
b1 = tx2.get('joe','balance')
b2 = tx2.get('bob','balance')
tx2.set('joe','balance',b1 + 7)
tx2.set('bob','balance',b2 - 7)
tx2.commit()
Accismus stochastic bank test
● Bank account per row● Initialize N bank accounts with 1000● Run random transfer threads● Complete scan always sums to N*1000
Phrasecount example
● Have documents + source URI● Dedupe documents based on SHA1● Count number of unique documents each
phrase occurs in● Can do this with two map reduce jobs● https://github.com/keith-turner/phrasecount
Accismus Application
● Map Reduce+Bulk Import● Load Transactions● Observers● Export Transactions
Load transaction 1
document:b4bf617e
my dog is very nice
http://foo.com/a
Load transaction 2
document:b4bf617e
my dog is very nice
http://foo.com/a http://foo.net/a
Load transaction 3
document:1e111475
his dog is very nice
document:b4bf617e
my dog is very nice
http://foo.com/a http://foo.net/a http://foo.com/c
Observer transaction 1
document:1e111475
his dog is very nice
document:b4bf617e
my dog is very nice
http://foo.com/a http://foo.net/a http://foo.com/c
my dog is very : 1
dog is very nice : 1
Observer transaction 2
document:1e111475
his dog is very nice
document:b4bf617e
my dog is very nice
http://foo.com/a http://foo.net/a http://foo.com/c
my dog is very : 1 his dog is very : 1
dog is very nice : 2
Load transaction 4
document:1e111475
his dog is very nice
document:b4bf617e
my dog is very nice
http://foo.com/a http://foo.net/a http://foo.com/c
my dog is very : 1 his dog is very : 1
dog is very nice : 2
Observer transaction 3
document:b4bf617e
my dog is very nice
http://foo.com/a http://foo.net/a http://foo.com/c
my dog is very : 1
dog is very nice : 1
Phrasecount schema
Row Column Value
uri:<uri> doc:hash <hash>
doc:<hash> doc:content <document>
doc:<hash> doc:refCount <int>
doc:<hash> index:check null
doc:<hash> index:status INDEXED | null
phrase:<phrase> stat:docCount <int>
Querying phrase counts
● Query Accismus directly– Lazy recovery may significantly delay query
– High load may delay queries
● Export transaction write to Accumlo table– WARNING : leaving the sane word of transactions
– Faults during export
– Concurrently exporting same item
– Out of order arrival of exported data
Export transaction strategy
● Only export committed data (Intent log)– Don't export something a transaction is going to
commit
● Idempotent– Export transaction can fail
– Expect repeated execution (possibly concurrent)
● Use committed sequence # to order data– Thread could read export data, pause, then export old
data.
– Use seq # as timestamp in Accumulo export table
Phrasecount export schema
Row Column Value
phrase:<phrase> export:check
phrase:<phrase> export:seq <int>
phrase:<phrase> export:sum <int>
phrase:<phrase> stat:sum <int>
Phrasecount problems
● No handling for high cardinality phrases– Weak notifications mentioned in paper
– Multi-row tree another possibility
● Possible memory exhaustion– Percolator uses many threads to get high
throughput
– Example loads entire document into memory. Many threads X large documents == dead worker.
Weak notifications(Queue)
String pr = 'phrase:'+phrase;
int current = tx1.get(pr,'stat:docCount')
if(isHighVolume(phrase)){
tx1.set(pr,'stat:docCount'+rand,delta)
tx1.weakNotify(pr); //trigger observer to collapse rand columns
}else
tx1.set(pr, 'stat:docCount',delta + current)
Multi-row tree for high cardinality
phrase:<phrase>
phrase_01:<phrase>
phrase_1:<phrase>
phrase_00:<phrase>
phrase_0:<phrase>
phrase_10:<phrase>
phrase_11:<phrase>
● Incoming updates leaves● Observers percolate to root● Export from root
Timestamp Oracle
● lightweight centralized service that issues timestamp– Allocates batches of timestamps from zookeeper
– Give batches of timestamps to nodes executing transactions
Timestamp oracle
● Gives logical global ordering to events– Transactions get timestamp at start. Only read data
committed before.
– Transaction get timestamp when committing.
Percolator Implementation
● Two phase commit using conditional mutations– Write lock+data to primary row/column– Write lock+data to all other row/columns– commit primary row/column if still locked– commit all other row/columns
● Lock fails if change between start and commit timestamp
● All row/columns in transaction point to primary● In case of failure, primary is authority● No centralized locking
Handling failures
● Transaction dies in phase 1– Written some locks+data
– Must rollback
● Transaction dies in phase 2– All locks+data written
– Roll-forward and write data pointers
Transfer transaction
Row Column Percolator Type
Time Value
bob balance write 1 0
bob balance data 0 100
joe balance write 1 0
joe balance data 0 100
Percolator appends column type to qualifier. Accismus uses high 4 bits of timestamp.
Lock primary
Row Column Percolator Type
Time Value
bob balance write 1 0
bob balance lock 3 bob:balance
bob balance data 3 93
bob balance data 0 100
joe balance write 1 0
joe balance data 0 100
Lock other
Row Column Percolator Type
Time Value
bob balance write 1 0
bob balance lock 3 bob:balance
bob balance data 3 93
bob balance data 0 100
joe balance write 1 0
joe balance lock 3 bob:balance
joe balance data 3 107
joe balance data 0 100
Commit primary
Row Column Percolator Type
Time Value
bob balance write 6 3
bob balance write 1 0
bob balance data 3 93
bob balance data 0 100
joe balance write 1 0
joe balance lock 3 bob:balance
joe balance data 3 107
joe balance data 0 100
What happens if tx with start time 7 reads joe and bob?Commit timestamp obtained after all locks written, why?
Commit other
Row Column Percolator Type
Time Value
bob balance write 6 3
bob balance write 1 0
bob balance data 3 93
bob balance data 0 100
joe balance write 6 3
joe balance write 1 0
joe balance data 3 107
joe balance data 0 100
Garbage collection
● Not mentioned in paper● Use compaction iterator● Currently keep X versions. Could determine
oldest active scan start timestamp.● Must keep data about success/failure of
primary column– Added extra column type to indicate when primary
can be collected. Never collected in failure case.
After GC Iterator
Row Column Percolator Type
Time Value
bob balance write 6 3:TRUNC
bob balance write 1 0
bob balance data 3 93
bob balance data 0 100
joe balance write 6 3:TRUNC
joe balance write 1 0
joe balance data 3 107
joe balance data 0 100
Transaction with read time of 5 would see StaleScanException
Snapshot iterator
● Used to read data● Analyzes percolator metadata on tserver● Returns commited data <= start OR open locks● Detects scan past point of GC
– Client code throws StaleScanException
Accismus API
● Minimal byte buffer based API
– Currently byte sequence, plan to move to byte buffer. (could be your first patch :)
– remove all external dependencies, like Accumulo Range
● Wrap minimal API w/ convenience API that handles nulls, encoding, and types well.
//automatically encode strings and int into bytes using supplied encoder tx.mutate().row(“doc:”+hash).fam(“doc”).qual(“refCount”).set(5);
//no need to check if value is null and then parse as int int rc = tx.get().row(“doc:”+hash).fam(“doc”).qual(“refCount”).toInteger(0);
TODO
● test at scale● create a cluster test suite● weak notifications● use YARN to run● improve batching of reads and writes● Initialization via M/R. Accismus file output format● column read ahead based on past read patterns● Improve GC● Improve finding notifications
Collaborate
● https://github.com/keith-turner/Accismus● Interested in building an Accismus application?● Hope to have a feature complete Alpha within a
few months that can be stabilized