what’s happening? performance servicesbadri/553dir/notes/w9-four.pdf · 2009. 3. 30. · 1 1...

17
1 1 Performance services Badri Nath Rutgers University [email protected] 1. Pig latin 2. Dynamo 2 What’s happening? Tera, Peta, Exa bytes Efficient Processing Analysis Results Clusters: thousands of machines 10 12 , 10 15 , 10 18 3 Work load is different Huge data sets Tera bytes , peta bytes are common Read-only data, Scan-centric Applications need a run through the entire data set for analysis Need to take advantage of parallelism (cluster hardware) Very little dependency, no txns, no index based lookups 4 Pig & Pig Latin A layer on top of map-reduce (Hadoop) Pig is the system Pig Latin is the query language Pig Pen is a debugging environment Pig Latin is a hybrid between: high-level declarative query language in the spirit of SQL low-level, procedural programming à la map-reduce Parts of DB engine is exposed Can we say that the DB community has gone hog wild!!!

Upload: others

Post on 10-Oct-2020

1 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

1

1

Performance services

Badri NathRutgers University

[email protected]

1. Pig latin2. Dynamo

2

What’s happening?

Tera, Peta, Exa bytes

Efficient ProcessingAnalysis

Results

Clusters: thousands of machines

1012, 1015, 1018

3

Work load is different

Huge data setsTera bytes , peta bytes are common

Read-only data, Scan-centricApplications need a run through the entire data set for analysis

Need to take advantage of parallelism (cluster hardware)

Very little dependency, no txns, no index based lookups

4

Pig & Pig LatinA layer on top of map-reduce (Hadoop)

Pig is the systemPig Latin is the query languagePig Pen is a debugging environment

Pig Latin is a hybrid between:high-level declarative query language in the spirit of SQLlow-level, procedural programming à la map-reduceParts of DB engine is exposed

Can we say that the DB community has gone hog wild!!!

Page 2: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

2

5

Example beer table

0.7corona30I0.7corona26H0.5amstel25G0.6heineken27F0.7corona22E0.2bud42D0.7corona25C0.2bud34B0.5amstel23Aqualitybeeragename

6

Example

Table BeerPreference: (name,age, beer, quality)Find, for each high quality beer , the average age of beer drinkers in that category. In SQL:SELECT beer, AVG(age)FROM BeerPreference WHERE quality > 0.2GROUP BY beer

7

Example in Pig LatinSame query in Pig latingood_beer = FILTER beerpreference BY quality > 0.2;beer_groups = GROUP good_beer BY beer;output = FOREACH beer_groups GENERATE

beer, AVG(good_beer.age);

8

Filter, GROUP

0.7corona30I0.7corona26H0.5amstel25G0.6heineken27F0.7corona22E0.7corona25C0.5amstel23Aqualitybeeragename

Good_beer = FILTER beerpref by quality > 0.2

Beer_group = GROUP good-beer by beer

Page 3: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

3

9

Beer-group= (beer,good-beer)

(C,25,corona,0.7), (H,26,corona,0.7), (I,30,corona,0.7)

corona

(F,27,heineken,0.6)Heineken

(A,23,Amstel,0.5), (G,25,Amstel,0.5)

amstel

10

output

27corona

27heineken

26amstel

11

Example (in the paper)

Table urls: (url, category, pagerank)Find, for each sufficiently large category, the average pagerank of high-pagerank urls in that category. In SQL:SELECT category, AVG(pagerank)FROM urls WHERE pagerank > 0.2GROUP BY category HAVING COUNT(*) > 106

12

Example in Pig LatinSame query in Pig latingood_urls = FILTER urls BY pagerank > 0.2;groups = GROUP good_urls BY category;big_groups = FILTER groups

BY COUNT(good_urls)>106;output = FOREACH big_groups GENERATE

category, AVG(big_groups.pagerank);

1. Sequence of steps; each step applies a transformation to data (potentially huge)2. Data flow !!3. User is specifying the query execution plan4. High-level operations (filter group, agggregate)

Page 4: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

4

13

FeaturesPig is a data flow language

Data fed to high level operations

Nested data modelForget 1NF !!

UDF or user-defined functionsSpam-urls = FILTER urls BY isSpam(url).Bugs!?!

Provides A debugging environmentFree format (Schema is optional)

It can Process anything

14

Nested data model

A given data item has multiple referencesEasy to capture that in a nested structure

(key, (ref1, ref2, ref3))Define complex, non-atomic data types

set, map, bag etcData, atleast in web processing, is inherently nestedSuited for data flow language

15

User Defined Functions

Example 2 Beer-groups = GROUP good_beer BY category;output = FOREACH beer_groups GENERATE

category, youngest(good-beer.age);

{(C,25,corona,0.7), (H,26,corona,0.7), (I,30,corona,0.7)}corona

{(F,27,heineken,0.6)}heineken

{(A,23,Amstel,0.5), (G,25,Amstel,0.5)}amstel

25corona

27heineken

23amstel

16

Data Model

Atom, e.g., `alice‘ or ‘corona’Tuple – sequence of fields, e.g., (`alice', `lakers'); ( a, (a,25,corona,0.7))Bag- collection of tuples, e.g., { (`alice', `lakers') (`alice', (`iPod', `apple')}Map, e.g.,[ `fan of' → { (`lakers') (`iPod') }

`age‘ → 20 ]

Page 5: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

5

17

Expressions in Pig Latin

18

Specifying Input Dataqueries = LOAD `query_log.txt'USING myLoad()AS (userId, queryString, timestamp);Need to create a bag for processingLOAD returns a handle to a bagMyload() is the UDF that does data cleaningUSING is the deserializerAS specifies the names for the fields in the bag

Optional, position ($0, $1, …) can be used to refer to fieldsField by name or field by position

19

FOREACHexpanded_queries = FOREACH queries GENERATE

userId, UPPERCASE(queryString);Apply GENERATE to each tupleeach tuple is processed independently; easy to parallelizeTo remove one level of nesting:expanded_queries = FOREACH queries GENERATE

userId, FLATTEN(expandQuery(queryString));

20

ForEach and Flattening

Page 6: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

6

21

Flattening Example

1 23 45 78 9

A 2B 4C 1D 2

angelina

2 28 47 74 9

E 2A 4D 1C 2

brad

ANSWER1 = FOREACH T GENERATE A, FLATTEN (B), C

T A B C

ANSWER2 = FOREACH T GENERATE A, FLATTEN (C) 22

3.4 Filter

real_queries = FILTER queries BY userId neq `bot';

real_queries =FILTER queries BY NOT isBot(userId);

Oink, Oink, Basically a select: PIG operates on BAG, SQL operates on table

23

Co-Group Groups two related data sets; for example:

results: (queryString, url, position)revenue: (queryString, adSlot, amount)

grouped_data = COGROUP results BYqueryString, revenue BY queryString;

Apply GROUP BY to each bag and produce another bagcommonattr, category1 category2

url_revenues = FOREACH grouped_data GENERATEFLATTEN(distributeRevenue(results, revenue));Co-Group more flexible than SQL JOIN

24

CoGroup vs Join

Page 7: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

7

25

Group (only one data set)grouped_revenue = GROUP revenue BY queryString;{(querystring1, bag) (querystring2, bag)….}query_revenues = FOREACH grouped_revenue GENERATE queryString, SUM(revenue.amount) AS totalRevenue;

26

Join in Pig Latin join_result = JOIN results BYqueryString, revenue BY queryString;Shorthand for:temp_var = COGROUP results BY queryString,revenue BY queryString;join_result = FOREACH temp_var GENERATEFLATTEN(results), FLATTEN(revenue);The above is a equi join

27

MapReduce in Pig Latinmap_result = FOREACH input

GENERATE FLATTEN(map(*));Map is a UDF works on all documents, contents and produces a bag of key value pairsFlatten removes one level of nesting to produce [K2,V2]key_groups = GROUP map_result BY $0;output = FOREACH key_groups

GENERATE reduce(*);

28

Store

To materialize result in a file:STORE query_revenuesINTO `myoutput' USING myStore();myStore is a UDF

Page 8: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

8

29

Map-Reduce Plan Compilation

Map tasks assign keys for grouping, and the reduce tasks process a group at a time.Compiler:Converts each (CO)GROUP command in the logical plan into a distinct MapReduce job consisting of its own MAP and REDUCE functions.

30

Debugging EnvironmentIterative process for programming.Sandbox data set generated automatically to show results for theexpressions.

31

PIG is SQL for clusters

UDF debuggingFormat errorsUser initiated optimizationTargets a new demographic of programmersThe cool typeKnows Python, map-reduce, Carries ipod, iphone

32

Dynamo: Amazon’s key-value store

SOSP 2007 paperGiuseppe DeCandia, Deniz Hastorun,

Madan Jampani, Gunavardhan Kakulapati,Avinash Lakshman, Alex Pilchin, Swaminathan Sivasubramanian,

Peter Vosshalland Werner Vogels

Page 9: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

9

33

AmazonWorld’s largest online storeApprox 85 M usersApprox 3-4 M checkouts per day, 20B in revenuesA highly available data storeDowntime $$$Response time, latency important for customersUser operations cannot be lostMassive store of key,value

34

Dynamo

Scalable approach to manage persistent storeDesign spaceSimple read and write operations on dataTrade off consistency for availability,latency

ACID vs BASE Run on geo redundant clusterCluster is controlled by Amazon (hosts within the cluster can be trusted)

35

Architecture

S3: hosted storage service 36

Data Access Model

Simple API put(key, object), get(key)Data stored as (key, object) pairs:

Handle or “identifier’ generated as a hash for objectObjects: OpaqueApplication examples: shopping carts, customer preferences, session management, sales rank, and product catalog

Page 10: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

10

37

Design requirementsJust a key-value storeNot a full fledged databaseStore small objects (size < 1 MB)Operations on Single Objects

Similar to ops on one row in big tableNo constraints across shopping carts

38

Service level agreements

User experience is keyProvide < 300 msec response time for 99.9% of requestsPeak load of 500 requests/secCustomer updates should not be rejected due to system problems

39

Dynamo: uses all solutions known in Distributed systems

ReplicationConsistent hashing

Version reconciliationVector clocks

ReplicationSloppy quorum

Handling temporary failuresHinted handoff

Recovering from permanent failuresMerkle trees

Membership and failure detectionGossip, anti-entropy 40

Preserves symmetry and avoids having a centralized

registry for storing membership and node liveness information.

Gossip-based membership protocol and

failure detection.

Membership and failure detection

Synchronizes divergent replicas in the background.

Anti-entropy using Merkle trees

Recovering from permanent failures

Provides high availability and durability guarantee

when some of the replicas are not available.

‘Sloppy’ quorum and hinted handoff

Handling temporary failures

Version size is decoupled from update rates.

Eventual consistencyVector clocks with

reconciliation during reads

High Availability for writes

Incremental ScalabilityConsistent HashingPartitioning

AdvantageTechniqueProblem

Page 11: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

11

41

Partition Algorithm: Consistent hashing

Consistent hashing: the output range of a hash function is treated as a fixed circular space or “ring”.Advantage: incremental scalabilityFinger pointers to all nodes O(N) state

42

Node hashes to multiple points on the ring

Uses Vnode, when a node is entered into the system it hashes to multiple points in the ringMultiple key ranges map to one nodeNode fails;loose a factor of the loadNode rejoins, absorb load from all keys that map to this node

43

Replication at N different hosts

Each data item is replicated at N hosts.Succ(k) + N-1 succ nodes“preference list”: The list of nodes that is responsible for storing a particular key.

44

Data VersioningConsistency modelEventual consistencyA put() call may return to its caller before the update has been applied at all the replicasData is immutable; create a new versionA get() call may return many versions of the same object.Challenge: an object having distinct version sub-histories, which the system will need to reconcile in the future.Solution: uses vector clocks in order to capture causality between different versions of the same object

Page 12: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

12

45

Logical Clocks

How to impose causality in a distributed systemSolution: timestamp updates with logical clocks[Lamport]

Each site mainatins a monotonically increasing clock value LCIncrement LC on each new eventSend LC along with any message sentReceiver sets its LC = Recv(LC) + 1Send happens-before receiveClock condition: e1 < e2 implies that LC(e1) < LC(e2)

46

Logical clocks example

1

1

2 3 4 5

2

1 3

1 4

6

7

4

If LC(e1) < LC(e2) does not imply causality

X

Y

Z

W

47

Vector ClocksNeed to infer from clock values if two events are related (can replicas be reconciled?)In a system with N nodes, each site keeps a vector timestamp TS[N] as well as a logical clock LC.

TS[j] at site i is the most recent value of site j’s logical clock that site i “heard about”.TSi[i] = LCi: each site i keeps its own LC in TS[i].

2. When site i generates a new event, it increments its logical clock.

TSi[i] = TSi[i] + 13. A site r observing an event (e.g., receiving a message) from site s sets its TSr to the pairwisemaximum of TSs and TSr.

For each site i, TSr[i] = MAX(TSr[i], TSs[i])

48

Vector clocks & orderingUse the vector timestamp at different sitese1 happened-before e2 if and only if TS(e2) dominates TS(e1)

e1< e2 iff TS(e1)[i] <= TS(e2)[i] for each site i“Every event or update visible when e1 occurred was also visible when e2 occurred.If Dominates replace old version

if two events are concurrent, one does not dominate the other then

If does not dominate, then Reconcile

Page 13: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

13

49

Vector Clocks: Example

(1, 0, 0) (2, 0, 0)X

Y(2, 0, 0)

(2, 0, 0) (2, 0, 1)

(2, 0, 1)D1 D2

D3

D4Z

(2, 1, 0)

D5

(2, 0, 1,0)

(2, 1, 0,0)

D6D7

50

Vector clock example

51

Executing put() and get()

Any node can process put(), get()Directed to any node by LB

Application agnostic of dynamoDirected to coordinator (dynamo aware)

Client knows how to map key to node

52

‘Sloppy’ Quorum

Traditional quorum system: R/W: the minimum number of nodes that must participate in a successful read/write operation.Setting R + W > N yields a quorum-like system.At least one intersecting node between R & W

‘Sloppy’ quorum:Set R + W <= N More chance to skip slower nodes

Page 14: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

14

53

Handling failures: Hinted handoff

A node D is down D is required for quorumMove Updates to D’D’ checks for D until D is up againD’ synchs with DD’ deletes data that was originally meant for D

54

Handling permanent failures

Replica synchronizationAnti-entropyEach node selects a random peer

{Get(peer, peerstate)New_state= mergestate(peerstate,mystate)Put(peer, new_state)My_state= new_state}

55

Merkle Tree

Hash tree where leaves are the hashes of the values of individual keysParent nodes are hashes of their childrenEach branch can be checked independently without downloading entire tree

56

Merkle TreeH1

H2 H3

H4 H5 H6 H7

H1

H2 H3

H4 H5 H6 H7

ReplicaNode Crashes

Page 15: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

15

57

Merkle TreeH1

H2 H3

H4 H5 H6 H7

H10

H2 H9

H4 H5 H8 H7

Replica modified!

Comes back!

compare

Compare root58

Merkle TreeH1

H2 H3

H4 H5 H6 H7

H10

H2 H9

H4 H5 H8 H7

compare

Compare left subtree

59

Merkle TreeH1

H2 H3

H4 H5 H6 H7

H10

H2 H9

H4 H5 H8 H7

compare

Compare right sub tree

60

Merkle TreeH1

H2 H3

H4 H5 H6 H7

H10

H2 H9

H4 H5 H8 H7

compare

Leaf Level mismatch

Page 16: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

16

61

Evaluation

62

Trading latency & durability

63

Load balance

64

Number of versions

99.94% time only single versionUpdates well coordinated

Dynamo aware client does better job at LBLatency halved compared to a generic LB

Page 17: What’s happening? Performance servicesbadri/553dir/notes/W9-four.pdf · 2009. 3. 30. · 1 1 Performance services Badri Nath Rutgers University badri@cs.rutgers.edu 1. Pig latin

17

65

Conclusions

Uses So many concepts from Distributed systems designAmazing that they have been able to put together a complex working systemCan a simpler design worked just the sameDifficult to get all concepts related to consistency working in the same system