building data products at linkedin with datafu

27
Building Data Products at LinkedIn with DataFu ©2013 LinkedIn Corporation. All Rights Reserved.

Upload: matthew-hayes

Post on 10-May-2015

1.727 views

Category:

Technology


0 download

DESCRIPTION

Examples of buildi

TRANSCRIPT

Page 1: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

Building Data Products at LinkedIn with DataFu

Page 2: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

Matthew HayesStaff Software Engineerwww.linkedin.com/in/matthewterencehayes/

Page 3: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

Tools of the trade

Page 4: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

What tools do we use?

Languages: Java (MapReduce) Pig R Hive Crunch

Systems: Voldemort Kafka Azkaban

Page 5: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

Pig: Usually the language of choice

High-level data flow language that produces MapReduce jobs Used extensively at LinkedIn for building data products. Why?

– Concise (compared to Java)– Expressive– Mature– Easy to use and understand– More aproachable than Java for some– Easy to learn if you know SQL– Easy to learn even if you don't know SQL– Extensible through UDFs– Reports task statistics

Page 6: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

Pig: Extensibility

Several types of UDFs you can write:– Eval– Algebraic– Accumulator

We do this a lot. Over time we accumulated a lot of useful UDFs Decided to open source them as DataFu library

Page 7: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFuCollection of UDFs for Pig

Page 8: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: History

Several teams were developing UDFs But:

– Not centralized in one library– Not shared– No automated tests

Solution:– Packaged UDFs in DataFu library– Automated unit tests, mostly through PigUnit

Started out as internal project. Open sourced September, 2011.

Page 9: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu ExamplesCollection of UDFs for Pig

Page 10: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Assert UDF

About as simple as a UDF gets. Blows up when it encounters zero. A convenient way to validate assumptions about data. What if member IDs can't and shouldn't be zero? Assert on this

condition:

data = filter data by ASSERT((memberId >= 0 ? 1 : 0), 'member ID was negative, doh!');

public Boolean exec(Tuple tuple) throws IOException { if ((Integer) tuple.get(0) == 0) { if (tuple.size() > 1) throw new IOException("Assertion violated: " + tuple.get(1).toString()); else throw new IOException("Assertion violated."); } else return true;}

Implementation:

Page 11: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Coalesce UDF

Using ternary operators is fairly common in Pig. Replace null values with zero:

data = FOREACH data GENERATE (val IS NOT NULL ? val : 0) as result;

data = FOREACH data GENERATE (val1 IS NOT NULL ? val1 : (val2 IS NOT NULL ? val2 : (val3 IS NOT NULL ? val3 : NULL))) as result;

Return first non-null value among several fields:

Unfortunately, out of the box there's no better way to do this in Pig.

Page 12: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Coalesce UDF

Simplify the code using the Coalesce UDF from DataFu– Behaves the same as COALESCE in SQL

Replace any null value with 0:

data = FOREACH data GENERATE Coalesce(val,0) as result;

Return first non-null value:

data = FOREACH data GENERATE Coalesce(val1,val2,val3) as result;

public Object exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; for (Object o : input) { if (o != null) return o; } return null; }

Implementation:

Page 13: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: In UDF

Suppose we want to filter some data based on a field equalling one of many values.

Can chain together conditional checks using OR:

data = LOAD 'input' using PigStorage(',') AS (what:chararray, adj:chararray); dump data;-- (roses,red)-- (violets,blue)-- (sugar,sweet) data = FILTER data BY adj == 'red' OR adj == 'blue'; dump data;-- (roses,red)-- (violets,blue)

As the number of items grows this really becomes a pain.

Page 14: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: In UDF

Much simpler using the In UDF:

data = FILTER data BY In(adj,'red','blue');

public Boolean exec(Tuple input) throws IOException { Object o = input.get(0); Boolean match = false; if (o != null) { for (int i=1; i<input.size() && !match; i++) { match = match || o.equals(input.get(i)); } } return match; }

Implementation:

Page 15: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: CountEach UDF

Suppose we have a system that recommends items to users. We've tracked what items have been recommended:

items = FOREACH items GENERATE memberId, itemId;

• Let's count how many times each item has been shown to a user.• Desired output schema:

{memberId: int,items: {(itemId: long,cnt: long)}}

Page 16: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: CountEach UDF

Typically, we would first count (member,item) pairs:

items = GROUP items BY (memberId,itemId);items = FOREACH items GENERATE group.memberId as memberId, group.itemId as itemId, COUNT(items) as cnt;

Then we would group again on member:

items = GROUP items BY memberId;items = FOREACH items generate group as memberId, items.(itemId,cnt) as items;

• But, this requires two MapReduce jobs!

Page 17: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: CountEach UDF

Using the CountEach UDF, we can accomplish the same thing with one MR job and much less code:

items = FOREACH (GROUP items BY memberId) generate group as memerId, CountEach(items.(itemId)) as items;

• Not only is it more concise, but it has better performance:– Wall clock time: 50% reduction– Total task time: 33% reduction

Page 18: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Session Statistics

Session: A period of sustained user activity Suppose we have a stream of user clicks:

pv = LOAD 'pageviews.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);

What session length statistics are we interested in?– Median– Variance– Percentiles (90th, 95th)

How will we define a session?– In this example: No gaps in activity greater than 10 minutes

Page 19: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Session Statistics

Define our UDFs:

DEFINE Sessionize datafu.pig.sessions.Sessionize('10m');

DEFINE Median datafu.pig.stats.StreamingMedian();DEFINE Quantile datafu.pig.stats.StreamingQuantile('0.90','0.95');DEFINE VAR datafu.pig.stats.VAR();

Page 20: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Session Statistics

Sessionize the data, appending a session ID to each tuple

pv = FOREACH pv GENERATE time, memberId;

pv_sessionized = FOREACH (GROUP pv BY memberId) { ordered = ORDER pv BY time; GENERATE FLATTEN(Sessionize(ordered)) AS (time, memberId, sessionId);};

pv_sessionized = FOREACH pv_sessionized GENERATE sessionId, memberId, time;

Page 21: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Session Statistics

Compute session length in minutes:

session_times = FOREACH (GROUP pv_sessionized BY (sessionId,memberId)) GENERATE group.sessionId as sessionId, group.memberId as memberId, (MAX(pv_sessionized.time) - MIN(pv_sessionized.time)) / 1000.0 / 60.0 as session_length;

Computes session length statistics:

session_stats = FOREACH (GROUP session_times ALL) { GENERATE AVG(ordered.session_length) as avg_session, SQRT(VAR(ordered.session_length)) as std_dev_session, Median(ordered.session_length) as median_session, Quantile(ordered.session_length) as quantiles_session;};DUMP session_stats

Page 22: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Session Statistics

Who are the most engaged users? Report users who had sessions in the upper 95th percentile:

long_sessions = filter session_times by session_length > session_stats.quantiles_session.quantile_0_95;

very_engaged_users = DISTINCT (FOREACH long_sessions GENERATE memberId);

DUMP very_engaged_users

Page 23: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Left join multiple relations

Suppose we have three data sets:

input1 = LOAD 'input1' using PigStorage(',') AS (key:INT,val:INT);input2 = LOAD 'input2' using PigStorage(',') AS (key:INT,val:INT);input3 = LOAD 'input3' using PigStorage(',') AS (key:INT,val:INT);

joined = JOIN input1 BY key LEFT, input2 BY key, input3 BY key;

We want to left join input1 with input2 and input3. Unfortunately, in Pig you can only perform outer joins on two

relations. This doesn't work:

Page 24: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Left join multiple relations

Instead you have to left join twice:

data1 = JOIN input1 BY key LEFT, input2 BY key;data2 = JOIN data1 BY input1::key LEFT, input3 BY key;

This is inefficient, as it requires two MapReduce jobs! Left joins are very common Take a recommendation system for example:

– Typically you build a candidate set, then join in features.– As number of features increases, so can number of joins.

Page 25: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Left join multiple relations

But, there's always COGROUP:

data1 = COGROUP input1 BY key, input2 BY key, input3 BY key;data2 = FOREACH data1 GENERATE FLATTEN(input1), -- left join on this FLATTEN((IsEmpty(input2) ? TOBAG(TOTUPLE((int)null,(int)null)) : input2)) as (input2::key,input2::val), FLATTEN((IsEmpty(input3) ? TOBAG(TOTUPLE((int)null,(int)null)) : input3)) as (input3::key,input3::val);

COGROUP is the same as GROUP– Convention: Use COGROUP instead of GROUP for readability.

This is ugly and hard to follow, but it does work. The code wouldn't be so bad if it weren't for the nasty ternary

expression. Perfect opportunity for writing a UDF.

Page 26: Building Data Products at LinkedIn with DataFu

©2013 LinkedIn Corporation. All Rights Reserved.

DataFu: Left join multiple relations

We wrote EmptyBagToNullFields to replace this ternary logic. Much cleaner:

data1 = COGROUP input1 BY key, input2 BY key, input3 BY key;data2 = FOREACH data1 GENERATE FLATTEN(input1), -- left join on this FLATTEN(EmptyBagToNullFields(input2)), FLATTEN(EmptyBagToNullFields(input3));

Page 27: Building Data Products at LinkedIn with DataFu

©2012 LinkedIn Corporation. All Rights Reserved.

data.linkedin.comLearning More

27