what’s happening? performance servicesbadri/553dir/notes/w9-four.pdf · 2009. 3. 30. · 1 1...
Post on 10-Oct-2020
1 Views
Preview:
TRANSCRIPT
1
1
Performance services
Badri NathRutgers University
badri@cs.rutgers.edu
1. Pig latin2. Dynamo
2
What’s happening?
Tera, Peta, Exa bytes
Efficient ProcessingAnalysis
Results
Clusters: thousands of machines
1012, 1015, 1018
3
Work load is different
Huge data setsTera bytes , peta bytes are common
Read-only data, Scan-centricApplications need a run through the entire data set for analysis
Need to take advantage of parallelism (cluster hardware)
Very little dependency, no txns, no index based lookups
4
Pig & Pig LatinA layer on top of map-reduce (Hadoop)
Pig is the systemPig Latin is the query languagePig Pen is a debugging environment
Pig Latin is a hybrid between:high-level declarative query language in the spirit of SQLlow-level, procedural programming à la map-reduceParts of DB engine is exposed
Can we say that the DB community has gone hog wild!!!
2
5
Example beer table
0.7corona30I0.7corona26H0.5amstel25G0.6heineken27F0.7corona22E0.2bud42D0.7corona25C0.2bud34B0.5amstel23Aqualitybeeragename
6
Example
Table BeerPreference: (name,age, beer, quality)Find, for each high quality beer , the average age of beer drinkers in that category. In SQL:SELECT beer, AVG(age)FROM BeerPreference WHERE quality > 0.2GROUP BY beer
7
Example in Pig LatinSame query in Pig latingood_beer = FILTER beerpreference BY quality > 0.2;beer_groups = GROUP good_beer BY beer;output = FOREACH beer_groups GENERATE
beer, AVG(good_beer.age);
8
Filter, GROUP
0.7corona30I0.7corona26H0.5amstel25G0.6heineken27F0.7corona22E0.7corona25C0.5amstel23Aqualitybeeragename
Good_beer = FILTER beerpref by quality > 0.2
Beer_group = GROUP good-beer by beer
3
9
Beer-group= (beer,good-beer)
(C,25,corona,0.7), (H,26,corona,0.7), (I,30,corona,0.7)
corona
(F,27,heineken,0.6)Heineken
(A,23,Amstel,0.5), (G,25,Amstel,0.5)
amstel
10
output
27corona
27heineken
26amstel
11
Example (in the paper)
Table urls: (url, category, pagerank)Find, for each sufficiently large category, the average pagerank of high-pagerank urls in that category. In SQL:SELECT category, AVG(pagerank)FROM urls WHERE pagerank > 0.2GROUP BY category HAVING COUNT(*) > 106
12
Example in Pig LatinSame query in Pig latingood_urls = FILTER urls BY pagerank > 0.2;groups = GROUP good_urls BY category;big_groups = FILTER groups
BY COUNT(good_urls)>106;output = FOREACH big_groups GENERATE
category, AVG(big_groups.pagerank);
1. Sequence of steps; each step applies a transformation to data (potentially huge)2. Data flow !!3. User is specifying the query execution plan4. High-level operations (filter group, agggregate)
4
13
FeaturesPig is a data flow language
Data fed to high level operations
Nested data modelForget 1NF !!
UDF or user-defined functionsSpam-urls = FILTER urls BY isSpam(url).Bugs!?!
Provides A debugging environmentFree format (Schema is optional)
It can Process anything
14
Nested data model
A given data item has multiple referencesEasy to capture that in a nested structure
(key, (ref1, ref2, ref3))Define complex, non-atomic data types
set, map, bag etcData, atleast in web processing, is inherently nestedSuited for data flow language
15
User Defined Functions
Example 2 Beer-groups = GROUP good_beer BY category;output = FOREACH beer_groups GENERATE
category, youngest(good-beer.age);
{(C,25,corona,0.7), (H,26,corona,0.7), (I,30,corona,0.7)}corona
{(F,27,heineken,0.6)}heineken
{(A,23,Amstel,0.5), (G,25,Amstel,0.5)}amstel
25corona
27heineken
23amstel
16
Data Model
Atom, e.g., `alice‘ or ‘corona’Tuple – sequence of fields, e.g., (`alice', `lakers'); ( a, (a,25,corona,0.7))Bag- collection of tuples, e.g., { (`alice', `lakers') (`alice', (`iPod', `apple')}Map, e.g.,[ `fan of' → { (`lakers') (`iPod') }
`age‘ → 20 ]
5
17
Expressions in Pig Latin
18
Specifying Input Dataqueries = LOAD `query_log.txt'USING myLoad()AS (userId, queryString, timestamp);Need to create a bag for processingLOAD returns a handle to a bagMyload() is the UDF that does data cleaningUSING is the deserializerAS specifies the names for the fields in the bag
Optional, position ($0, $1, …) can be used to refer to fieldsField by name or field by position
19
FOREACHexpanded_queries = FOREACH queries GENERATE
userId, UPPERCASE(queryString);Apply GENERATE to each tupleeach tuple is processed independently; easy to parallelizeTo remove one level of nesting:expanded_queries = FOREACH queries GENERATE
userId, FLATTEN(expandQuery(queryString));
20
ForEach and Flattening
6
21
Flattening Example
1 23 45 78 9
A 2B 4C 1D 2
angelina
2 28 47 74 9
E 2A 4D 1C 2
brad
ANSWER1 = FOREACH T GENERATE A, FLATTEN (B), C
T A B C
ANSWER2 = FOREACH T GENERATE A, FLATTEN (C) 22
3.4 Filter
real_queries = FILTER queries BY userId neq `bot';
real_queries =FILTER queries BY NOT isBot(userId);
Oink, Oink, Basically a select: PIG operates on BAG, SQL operates on table
23
Co-Group Groups two related data sets; for example:
results: (queryString, url, position)revenue: (queryString, adSlot, amount)
grouped_data = COGROUP results BYqueryString, revenue BY queryString;
Apply GROUP BY to each bag and produce another bagcommonattr, category1 category2
url_revenues = FOREACH grouped_data GENERATEFLATTEN(distributeRevenue(results, revenue));Co-Group more flexible than SQL JOIN
24
CoGroup vs Join
7
25
Group (only one data set)grouped_revenue = GROUP revenue BY queryString;{(querystring1, bag) (querystring2, bag)….}query_revenues = FOREACH grouped_revenue GENERATE queryString, SUM(revenue.amount) AS totalRevenue;
26
Join in Pig Latin join_result = JOIN results BYqueryString, revenue BY queryString;Shorthand for:temp_var = COGROUP results BY queryString,revenue BY queryString;join_result = FOREACH temp_var GENERATEFLATTEN(results), FLATTEN(revenue);The above is a equi join
27
MapReduce in Pig Latinmap_result = FOREACH input
GENERATE FLATTEN(map(*));Map is a UDF works on all documents, contents and produces a bag of key value pairsFlatten removes one level of nesting to produce [K2,V2]key_groups = GROUP map_result BY $0;output = FOREACH key_groups
GENERATE reduce(*);
28
Store
To materialize result in a file:STORE query_revenuesINTO `myoutput' USING myStore();myStore is a UDF
8
29
Map-Reduce Plan Compilation
Map tasks assign keys for grouping, and the reduce tasks process a group at a time.Compiler:Converts each (CO)GROUP command in the logical plan into a distinct MapReduce job consisting of its own MAP and REDUCE functions.
30
Debugging EnvironmentIterative process for programming.Sandbox data set generated automatically to show results for theexpressions.
31
PIG is SQL for clusters
UDF debuggingFormat errorsUser initiated optimizationTargets a new demographic of programmersThe cool typeKnows Python, map-reduce, Carries ipod, iphone
32
Dynamo: Amazon’s key-value store
SOSP 2007 paperGiuseppe DeCandia, Deniz Hastorun,
Madan Jampani, Gunavardhan Kakulapati,Avinash Lakshman, Alex Pilchin, Swaminathan Sivasubramanian,
Peter Vosshalland Werner Vogels
9
33
AmazonWorld’s largest online storeApprox 85 M usersApprox 3-4 M checkouts per day, 20B in revenuesA highly available data storeDowntime $$$Response time, latency important for customersUser operations cannot be lostMassive store of key,value
34
Dynamo
Scalable approach to manage persistent storeDesign spaceSimple read and write operations on dataTrade off consistency for availability,latency
ACID vs BASE Run on geo redundant clusterCluster is controlled by Amazon (hosts within the cluster can be trusted)
35
Architecture
S3: hosted storage service 36
Data Access Model
Simple API put(key, object), get(key)Data stored as (key, object) pairs:
Handle or “identifier’ generated as a hash for objectObjects: OpaqueApplication examples: shopping carts, customer preferences, session management, sales rank, and product catalog
10
37
Design requirementsJust a key-value storeNot a full fledged databaseStore small objects (size < 1 MB)Operations on Single Objects
Similar to ops on one row in big tableNo constraints across shopping carts
38
Service level agreements
User experience is keyProvide < 300 msec response time for 99.9% of requestsPeak load of 500 requests/secCustomer updates should not be rejected due to system problems
39
Dynamo: uses all solutions known in Distributed systems
ReplicationConsistent hashing
Version reconciliationVector clocks
ReplicationSloppy quorum
Handling temporary failuresHinted handoff
Recovering from permanent failuresMerkle trees
Membership and failure detectionGossip, anti-entropy 40
Preserves symmetry and avoids having a centralized
registry for storing membership and node liveness information.
Gossip-based membership protocol and
failure detection.
Membership and failure detection
Synchronizes divergent replicas in the background.
Anti-entropy using Merkle trees
Recovering from permanent failures
Provides high availability and durability guarantee
when some of the replicas are not available.
‘Sloppy’ quorum and hinted handoff
Handling temporary failures
Version size is decoupled from update rates.
Eventual consistencyVector clocks with
reconciliation during reads
High Availability for writes
Incremental ScalabilityConsistent HashingPartitioning
AdvantageTechniqueProblem
11
41
Partition Algorithm: Consistent hashing
Consistent hashing: the output range of a hash function is treated as a fixed circular space or “ring”.Advantage: incremental scalabilityFinger pointers to all nodes O(N) state
42
Node hashes to multiple points on the ring
Uses Vnode, when a node is entered into the system it hashes to multiple points in the ringMultiple key ranges map to one nodeNode fails;loose a factor of the loadNode rejoins, absorb load from all keys that map to this node
43
Replication at N different hosts
Each data item is replicated at N hosts.Succ(k) + N-1 succ nodes“preference list”: The list of nodes that is responsible for storing a particular key.
44
Data VersioningConsistency modelEventual consistencyA put() call may return to its caller before the update has been applied at all the replicasData is immutable; create a new versionA get() call may return many versions of the same object.Challenge: an object having distinct version sub-histories, which the system will need to reconcile in the future.Solution: uses vector clocks in order to capture causality between different versions of the same object
12
45
Logical Clocks
How to impose causality in a distributed systemSolution: timestamp updates with logical clocks[Lamport]
Each site mainatins a monotonically increasing clock value LCIncrement LC on each new eventSend LC along with any message sentReceiver sets its LC = Recv(LC) + 1Send happens-before receiveClock condition: e1 < e2 implies that LC(e1) < LC(e2)
46
Logical clocks example
1
1
2 3 4 5
2
1 3
1 4
6
7
4
If LC(e1) < LC(e2) does not imply causality
X
Y
Z
W
47
Vector ClocksNeed to infer from clock values if two events are related (can replicas be reconciled?)In a system with N nodes, each site keeps a vector timestamp TS[N] as well as a logical clock LC.
TS[j] at site i is the most recent value of site j’s logical clock that site i “heard about”.TSi[i] = LCi: each site i keeps its own LC in TS[i].
2. When site i generates a new event, it increments its logical clock.
TSi[i] = TSi[i] + 13. A site r observing an event (e.g., receiving a message) from site s sets its TSr to the pairwisemaximum of TSs and TSr.
For each site i, TSr[i] = MAX(TSr[i], TSs[i])
48
Vector clocks & orderingUse the vector timestamp at different sitese1 happened-before e2 if and only if TS(e2) dominates TS(e1)
e1< e2 iff TS(e1)[i] <= TS(e2)[i] for each site i“Every event or update visible when e1 occurred was also visible when e2 occurred.If Dominates replace old version
if two events are concurrent, one does not dominate the other then
If does not dominate, then Reconcile
13
49
Vector Clocks: Example
(1, 0, 0) (2, 0, 0)X
Y(2, 0, 0)
(2, 0, 0) (2, 0, 1)
(2, 0, 1)D1 D2
D3
D4Z
(2, 1, 0)
D5
(2, 0, 1,0)
(2, 1, 0,0)
D6D7
50
Vector clock example
51
Executing put() and get()
Any node can process put(), get()Directed to any node by LB
Application agnostic of dynamoDirected to coordinator (dynamo aware)
Client knows how to map key to node
52
‘Sloppy’ Quorum
Traditional quorum system: R/W: the minimum number of nodes that must participate in a successful read/write operation.Setting R + W > N yields a quorum-like system.At least one intersecting node between R & W
‘Sloppy’ quorum:Set R + W <= N More chance to skip slower nodes
14
53
Handling failures: Hinted handoff
A node D is down D is required for quorumMove Updates to D’D’ checks for D until D is up againD’ synchs with DD’ deletes data that was originally meant for D
54
Handling permanent failures
Replica synchronizationAnti-entropyEach node selects a random peer
{Get(peer, peerstate)New_state= mergestate(peerstate,mystate)Put(peer, new_state)My_state= new_state}
55
Merkle Tree
Hash tree where leaves are the hashes of the values of individual keysParent nodes are hashes of their childrenEach branch can be checked independently without downloading entire tree
56
Merkle TreeH1
H2 H3
H4 H5 H6 H7
H1
H2 H3
H4 H5 H6 H7
ReplicaNode Crashes
15
57
Merkle TreeH1
H2 H3
H4 H5 H6 H7
H10
H2 H9
H4 H5 H8 H7
Replica modified!
Comes back!
compare
Compare root58
Merkle TreeH1
H2 H3
H4 H5 H6 H7
H10
H2 H9
H4 H5 H8 H7
compare
Compare left subtree
59
Merkle TreeH1
H2 H3
H4 H5 H6 H7
H10
H2 H9
H4 H5 H8 H7
compare
Compare right sub tree
60
Merkle TreeH1
H2 H3
H4 H5 H6 H7
H10
H2 H9
H4 H5 H8 H7
compare
Leaf Level mismatch
16
61
Evaluation
62
Trading latency & durability
63
Load balance
64
Number of versions
99.94% time only single versionUpdates well coordinated
Dynamo aware client does better job at LBLatency halved compared to a generic LB
17
65
Conclusions
Uses So many concepts from Distributed systems designAmazing that they have been able to put together a complex working systemCan a simpler design worked just the sameDifficult to get all concepts related to consistency working in the same system
top related