a brief tour of datafu
DESCRIPTION
DataFu (now Apache DataFu) is a collection of user-defined functions for working with large-scale data in Hadoop and Pig. This library was born out of the need for a stable, well-tested library of UDFs for data mining and statistics. It is used at LinkedIn in many of our off-line workflows for data derived products like “People You May Know” and “Skills”. It contains functions for: * PageRank * Quantiles (median), variance, etc. * Sessionization * Convenience bag functions (e.g., set operations, enumerating bags, etc) * Convenience utility functions (e.g., assertions, easier writing of EvalFuncs) * and more… Check out the project page: http://datafu.incubator.apache.org/TRANSCRIPT
A Brief Tour of DataFu
Matthew HayesStaff Engineer, LinkedIn
About Me
• @LinkedIn for 2+ years• Worked on skills & endorsements:
– http://data.linkedin.com/projects/skills-and-expertise
• Side projects:– http://data.linkedin.com/– http://data.linkedin.com/opensource/datafu– http://data.linkedin.com/opensource/white-elephant
History of DataFu
• LinkedIn had lots of useful UDFs developed by several teams
• Problems:– Not centralized, little code sharing– No automated tests
• Solution:– Centralized library– Unit tests (PigUnit!)– Code coverage (Cobertura)
• Open sourced September 2011
Examples
Session Statistics
• Suppose we a have stream of user clicks.
pv = LOAD 'pageviews.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);
• How to compute statistics on session length?– Median– Variance– Percentiles (90th, 95th)
Session Statistics
• First, what is a session?• Session: sustained user activity• Let's assume session ends when 10 minutes elapse with no
activity.• Define the Sessionize UDF:
DEFINE Sessionize datafu.pig.sessions.Sessionize('10m');
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
• Need to convert UNIX timestamp to ISO string:
Session Statistics
• Define the statistics UDFs from DataFu:DEFINE Median datafu.pig.stats.StreamingMedian();DEFINE Quantile datafu.pig.stats.StreamingQuantile('0.90','0.95');DEFINE VAR datafu.pig.stats.VAR();
• Streaming implementations are approximate– Contributed by Josh Wills (Cloudera)
• There are also non-streaming versions:– Require sorted input– Exact, but less efficient
Session Statistics
• Time in this example is a long.• Sessionize needs an ISO string, so convert it:
pv = FOREACH pv GENERATE UnixToISO(time) as isoTime, time, memberId;
Session Statistics
• Sessionize each user's click stream:
pv_sessionized = FOREACH (GROUP pv BY memberId) { ordered = ORDER pv BY isoTime; GENERATE FLATTEN(Sessionize(ordered)) AS (isoTime, time, memberId, sessionId);};
pv_sessionized = FOREACH pv_sessionized GENERATE sessionId, memberId, time;
• Session ID is appended to each tuple.• All tuples within same session have same ID.
Session Statistics
• Compute session length in minutes:
session_times = FOREACH (GROUP pv_sessionized BY (sessionId,memberId)) GENERATE group.sessionId as sessionId, group.memberId as memberId, (MAX(pv_sessionized.time) - MIN(pv_sessionized.time)) / 1000.0 / 60.0 as session_length;
Session Statistics
• Compute session length statistics:
session_stats = FOREACH (GROUP session_times ALL) { GENERATE AVG(ordered.session_length) as avg_session, SQRT(VAR(ordered.session_length)) as std_dev_session, Median(ordered.session_length) as median_session, Quantile(ordered.session_length) as quantiles_session;};
DUMP session_stats
Session Statistics
• Which users had >95th percentile sessions?
long_sessions = filter session_times by session_length > session_stats.quantiles_session.quantile_0_95;
very_engaged_users = DISTINCT (FOREACH long_sessions GENERATE memberId);
DUMP very_engaged_users
Session Statistics
• What if we want to count views per page per user?
pv_counts = FOREACH (GROUP pv BY (memberId,url)) GENERATE group.memberId as memberId, group.url as url, COUNT(pv) as cnt;
• But refreshes and go-backs are not that significant.• Multiple views across sessions are more
meaningful.
Session Statistics
• Use TimeCount to sessionize the counts:
define TimeCount datafu.pig.date.TimeCount('10m');
pv_counts = FOREACH (GROUP pv BY (memberId,url)) { ordered = order pv by time; GENERATE group.memberId as memberId, group.url as url, TimeCount(ordered.(time)) as cnt;}
• Uses the same principle as Sessionize UDF.
ASSERT
• Filter function that blows up on 0.
data = filter data by ASSERT((memberId >= 0 ? 1 : 0), 'member ID was negative, doh!');
• Try it on 1,2,3,4,5,-1:– ERROR 2078: Caught error from UDF: datafu.pig.util.ASSERT
[Assertion violated: member ID was negative, doh!]
WilsonBinConf
• Computes confidence interval for a proportion• Assumes binomial distribution • For 99% confidence:
define WilsonBinConf datafu.pig.stats.WilsonBinConf('0.01');
WilsonBinConf
• Example: Is a given coin fair?• Collect samples, compute interval for
proportion.
flips = LOAD 'flips.csv' using PigStorage() as (result:int);
flip_prop = foreach (GROUP flips ALL) generate SUM(flips.result) as success, COUNT(flips.result) as total;
conf = FOREACH flip_prop GENERATE WilsonBinConf(success,total);
WilsonBinConf
• 10 flips: – ((0.24815974093858853,0.8720694404004281))
• 100 flips: – ((0.4518081551463118,0.6982365348191562))
• 10,000 flips: – ((0.4986209024723033,0.524363847383827))
• 100,000 flips: – ((0.4976073029016679,0.5057524741805967))
CountEach
• Suppose we have a recommendation system, and we've tracked what items have been recommended.
items = FOREACH items GENERATE memberId, itemId;
• We want to produce a bag of items shown to users with count for each item.
• Output should look like:
{memberId: int,items: {(itemId: long,cnt: long)}}
CountEach
• Typically, we would first count (member,item) pairs:
items = GROUP items BY (memberId,itemId);items = FOREACH items GENERATE group.memberId as memberId, group.itemId as itemId, COUNT(items) as cnt;
CountEach
• Then we would group again on member:
items = GROUP items BY memberId;items = FOREACH items generate group as memberId, items.(itemId,cnt) as items;
• But, this requires two MR jobs.
CountEach
• Using CountEach, we can accomplish the same thing with one MR job and less code:
items = FOREACH (GROUP items BY memberId) generate group as memerId, CountEach(items.(itemId)) as items;
• Better performance too! In one test I ran:– Wall clock time: 50% reduction– Total task time: 33% reduction
AliasableEvalFunc
• Pig has great support for UDFs• But, UDFs with many positional parameters
are sometimes error prone.• Let's look at an example.
AliasableEvalFunc
• Suppose we want to compute monthly payments for various interest rates.
mortgage = load 'mortgage.csv' using PigStorage('|')as (principal:double, num_payments:int, interest_rates: bag {tuple(interest_rate:double)});
AliasableEvalFunc• Let's write a UDF to compute monthly payments.• Get the input parameters:
@Override public DataBag exec(Tuple input) throws IOException { Double principal = (Double)input.get(0); Integer numPayments = (Integer)input.get(1); DataBag interestRates = (DataBag)input.get(2);
// ...
AliasableEvalFunc
• Compute the monthly payment for each interest rate:
DataBag output = BagFactory.getInstance() .newDefaultBag();
for (Tuple interestTuple : interestRates) { Double interest = (Double)interestTuple.get(0);
double monthlyPayment = computeMonthlyPayment(principal, numPayments, interest);
output.add(TupleFactory.getInstance() .newTuple(monthlyPayment));}
AliasableEvalFunc
• Apply the UDF:payments = FOREACH mortgage GENERATE MortgagePayment(principal,num_payments,interest_rates);
• But, we have to remember the correct order.• This won't work:
payments = FOREACH mortgage GENERATE MortgagePayment(num_payments,principal,interest_rates);
AliasableEvalFunc
• AliasableEvalFunc to the rescue!• Get the parameters by name:
Double principal = getDouble(input,"principal"); Integer numPayments = getInteger(input,"num_payments"); DataBag interestRates = getBag(input,"interest_rates");
AliasableEvalFunc
• Get each interest rate from the bag:
for (Tuple interestTuple : interestRates) { Double interest = getDouble(interestTuple, getPrefixedAliasName("interest_rates", "interest_rate")); // compute monthly payment...}
AliasableEvalFunc
• Now order doesn't matter, as long as names are correct:
payments = FOREACH mortgage GENERATE MortgagePayment(principal,num_payments,interest_rates);payments = FOREACH mortgage GENERATE MortgagePayment(num_payments,principal,interest_rates);
SetIntersect
• Set intersection of two or more sorted bagsdefine SetIntersect datafu.pig.bags.sets.SetIntersect()
-- input: -- ({(2),(3),(4)},{(1),(2),(4),(8)})
input = FOREACH input { B1 = ORDER B1 BY val; B2 = ORDER B2 BY val; GENERATE SetIntersect(B1,B2); }
-- ouput: ({(2),(4)})
SetUnion
• Set union of two or more bags
define SetUnion datafu.pig.bags.sets.SetUnion();
-- input:-- ({(2),(3),(4)},{(1),(2),(4),(8)})
output = FOREACH input GENERATE SetUnion(B1,B2);
-- output:-- ({(2),(3),(4),(1),(8)})
BagConcat
• Concatenate tuples from set of bags
define BagConcat datafu.pig.bags.BagConcat();
-- input:-- ({(1),(2),(3)},{(3),(4),(5)})
output = FOREACH input GENERATE BagConcat(A,B);
-- output:-- ({(1),(2),(3),(3),(4),(5)})
What Else Is There?
• PageRank (in-memory implementation)• WeightedSample• NullToEmptyBag• AppendToBag• PrependToBag• ...
Thanks!
• We welcome contributions:– https://github.com/linkedin/datafu